[svn r37465] * all Sessions now have fixoptions()
* added some tests for implied and conflicting options * make all Session objects inherit from the base session * small cleanups with respect to test setup * separate tests a bit --HG-- branch : trunk
This commit is contained in:
parent
15c1ebe506
commit
1d3519bfe0
|
@ -48,7 +48,6 @@ class Config(object):
|
|||
args = [str(x) for x in args]
|
||||
cmdlineoption, args = self._parser.parse_args(args)
|
||||
self.option.__dict__.update(vars(cmdlineoption))
|
||||
fixoptions(self.option) # XXX fixing should be moved to sessions
|
||||
if not args:
|
||||
args.append(py.std.os.getcwd())
|
||||
self.topdir = gettopdir(args)
|
||||
|
@ -130,8 +129,6 @@ class Config(object):
|
|||
""" return an initialized session object. """
|
||||
cls = self._getsessionclass()
|
||||
session = cls(self)
|
||||
# XXX: all sessions should have one
|
||||
if hasattr(session, 'fixoptions'):
|
||||
session.fixoptions()
|
||||
return session
|
||||
|
||||
|
@ -272,28 +269,6 @@ def checkmarshal(name, value):
|
|||
except ValueError:
|
||||
raise ValueError("%s=%r is not marshallable" %(name, value))
|
||||
|
||||
def fixoptions(option):
|
||||
""" sanity checks and making option values canonical. """
|
||||
|
||||
# implied options
|
||||
if option.usepdb:
|
||||
if not option.nocapture:
|
||||
#print "--pdb implies --nocapture"
|
||||
option.nocapture = True
|
||||
|
||||
if option.runbrowser and not option.startserver:
|
||||
#print "--runbrowser implies --startserver"
|
||||
option.startserver = True
|
||||
|
||||
# conflicting options
|
||||
if option.looponfailing and option.usepdb:
|
||||
raise ValueError, "--looponfailing together with --pdb not supported."
|
||||
if option.looponfailing and option.dist:
|
||||
raise ValueError, "--looponfailing together with --dist not supported."
|
||||
if option.executable and option.usepdb:
|
||||
raise ValueError, "--exec together with --pdb not supported."
|
||||
|
||||
|
||||
def gettopdir(args):
|
||||
""" return the top directory for the given paths.
|
||||
if the common base dir resides in a python package
|
||||
|
|
|
@ -16,16 +16,24 @@ from py.__.test.rsession.hostmanage import HostInfo, HostOptions, HostManager
|
|||
from py.__.test.rsession.local import local_loop, plain_runner, apigen_runner,\
|
||||
box_runner
|
||||
from py.__.test.rsession.reporter import LocalReporter, RemoteReporter
|
||||
from py.__.test.session import Session
|
||||
|
||||
class AbstractSession(object):
|
||||
class AbstractSession(Session):
|
||||
"""
|
||||
An abstract session executes collectors/items through a runner.
|
||||
|
||||
"""
|
||||
def __init__(self, config, optimise_localhost=True):
|
||||
self.config = config
|
||||
super(AbstractSession, self).__init__(config=config)
|
||||
self.optimise_localhost = optimise_localhost
|
||||
|
||||
def fixoptions(self):
|
||||
option = self.config.option
|
||||
if option.runbrowser and not option.startserver:
|
||||
#print "--runbrowser implies --startserver"
|
||||
option.startserver = True
|
||||
super(AbstractSession, self).fixoptions()
|
||||
|
||||
def getpkgdir(path):
|
||||
path = py.path.local(path)
|
||||
pkgpath = path.pypkgpath()
|
||||
|
@ -122,6 +130,7 @@ class RSession(AbstractSession):
|
|||
""" Remote version of session
|
||||
"""
|
||||
def fixoptions(self):
|
||||
super(RSession, self).fixoptions()
|
||||
config = self.config
|
||||
try:
|
||||
config.getvalue('disthosts')
|
||||
|
|
|
@ -23,6 +23,21 @@ class Session(object):
|
|||
if not self.config.option.nomagic:
|
||||
py.magic.revoke(assertion=1)
|
||||
|
||||
def fixoptions(self):
|
||||
""" check, fix and determine conflicting options. """
|
||||
option = self.config.option
|
||||
# implied options
|
||||
if option.usepdb:
|
||||
if not option.nocapture:
|
||||
option.nocapture = True
|
||||
# conflicting options
|
||||
if option.looponfailing and option.usepdb:
|
||||
raise ValueError, "--looponfailing together with --pdb not supported."
|
||||
if option.looponfailing and option.dist:
|
||||
raise ValueError, "--looponfailing together with --dist not supported."
|
||||
if option.executable and option.usepdb:
|
||||
raise ValueError, "--exec together with --pdb not supported."
|
||||
|
||||
def start(self, colitem):
|
||||
""" hook invoked before each colitem.run() invocation. """
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from __future__ import generators
|
||||
import py
|
||||
from py.__.test.session import Session
|
||||
from py.__.test.terminal.out import getout
|
||||
import sys
|
||||
|
||||
def checkpyfilechange(rootdir, statcache={}):
|
||||
""" wait until project files are changed. """
|
||||
|
@ -51,9 +51,9 @@ def getfailureitems(failures):
|
|||
l.append(current)
|
||||
return l
|
||||
|
||||
class RemoteTerminalSession(object):
|
||||
class RemoteTerminalSession(Session):
|
||||
def __init__(self, config, file=None):
|
||||
self.config = config
|
||||
super(RemoteTerminalSession, self).__init__(config=config)
|
||||
self._setexecutable()
|
||||
if file is None:
|
||||
file = py.std.sys.stdout
|
||||
|
@ -123,8 +123,6 @@ def slaverun_TerminalSession(channel):
|
|||
topdir, repr, failures = channel.receive()
|
||||
print "SLAVE: received configuration, using topdir:", topdir
|
||||
config = py.test.config
|
||||
import sys
|
||||
sys.stdout.flush()
|
||||
config.initdirect(topdir, repr, failures)
|
||||
config.option.session = None
|
||||
config.option.looponfailing = False
|
||||
|
|
|
@ -1,13 +1,20 @@
|
|||
import py
|
||||
|
||||
def setup_module(mod):
|
||||
mod.datadir = setupdatadir()
|
||||
mod.tmpdir = py.test.ensuretemp(mod.__name__)
|
||||
|
||||
def setupdatadir():
|
||||
datadir = py.test.ensuretemp("datadir")
|
||||
if not datadir.listdir():
|
||||
names = [x.basename for x in datadir.listdir()]
|
||||
for name, content in namecontent:
|
||||
if name not in names:
|
||||
datadir.join(name).write(content)
|
||||
return datadir
|
||||
|
||||
namecontent = [
|
||||
('syntax_error.py', "this is really not python\n"),
|
||||
|
||||
('disabled_module.py', py.code.Source('''
|
||||
disabled = True
|
||||
|
||||
|
|
|
@ -231,7 +231,8 @@ class TestSessionAndOptions:
|
|||
|
||||
def test_sessionname_lookup_custom(self):
|
||||
self.tmpdir.join("conftest.py").write(py.code.Source("""
|
||||
class MySession:
|
||||
from py.__.test.session import Session
|
||||
class MySession(Session):
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
"""))
|
||||
|
@ -357,3 +358,4 @@ class TestConfigColitems:
|
|||
col3 = config._getcollector(config.topdir.dirpath())
|
||||
py.test.raises(ValueError,
|
||||
"config.get_collector_trail(col3)")
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
import py
|
||||
from py.__.test.testing.setupdata import setup_module
|
||||
|
||||
class TestRemote:
|
||||
def test_exec(self):
|
||||
o = tmpdir.ensure('remote', dir=1)
|
||||
tfile = o.join('test_exec.py')
|
||||
tfile.write(py.code.Source("""
|
||||
def test_1():
|
||||
assert 1 == 0
|
||||
"""))
|
||||
print py.std.sys.executable
|
||||
config = py.test.config._reparse(
|
||||
['--exec=' + py.std.sys.executable,
|
||||
o])
|
||||
cls = config._getsessionclass()
|
||||
out = [] # out = py.std.Queue.Queue()
|
||||
session = cls(config, out.append)
|
||||
session.main()
|
||||
for s in out:
|
||||
if s.find('1 failed') != -1:
|
||||
break
|
||||
else:
|
||||
py.test.fail("did not see test_1 failure")
|
||||
|
||||
def test_looponfailing(self):
|
||||
o = tmpdir.ensure('looponfailing', dir=1)
|
||||
tfile = o.join('test_looponfailing.py')
|
||||
tfile.write(py.code.Source("""
|
||||
def test_1():
|
||||
assert 1 == 0
|
||||
"""))
|
||||
print py.std.sys.executable
|
||||
config = py.test.config._reparse(['--looponfailing', str(o)])
|
||||
cls = config._getsessionclass()
|
||||
out = py.std.Queue.Queue()
|
||||
session = cls(config, out.put)
|
||||
pool = py._thread.WorkerPool()
|
||||
reply = pool.dispatch(session.main)
|
||||
while 1:
|
||||
s = out.get(timeout=1.0)
|
||||
if s.find('1 failed') != -1:
|
||||
break
|
||||
print s
|
||||
else:
|
||||
py.test.fail("did not see test_1 failure")
|
||||
# XXX we would like to have a cleaner way to finish
|
||||
try:
|
||||
reply.get(timeout=0.5)
|
||||
except IOError, e:
|
||||
assert str(e).lower().find('timeout') != -1
|
||||
|
||||
|
|
@ -1,9 +1,38 @@
|
|||
import py
|
||||
from setupdata import setupdatadir
|
||||
from setupdata import setup_module # sets up global 'tmpdir'
|
||||
|
||||
def setup_module(mod):
|
||||
mod.datadir = setupdatadir()
|
||||
mod.tmpdir = py.test.ensuretemp(mod.__name__)
|
||||
implied_options = {
|
||||
'--pdb': 'usepdb and nocapture',
|
||||
'-v': 'verbose',
|
||||
'-l': 'showlocals',
|
||||
'--runbrowser': 'startserver and runbrowser',
|
||||
}
|
||||
|
||||
conflict_options = ('--looponfailing --pdb',
|
||||
'--dist --pdb',
|
||||
'--exec=%s --pdb' % py.std.sys.executable,
|
||||
)
|
||||
|
||||
def test_conflict_options():
|
||||
for spec in conflict_options:
|
||||
opts = spec.split()
|
||||
yield check_conflict_option, opts
|
||||
|
||||
def check_conflict_option(opts):
|
||||
print "testing if options conflict:", " ".join(opts)
|
||||
config = py.test.config._reparse(opts + [datadir/'filetest.py'])
|
||||
py.test.raises((ValueError, SystemExit), """
|
||||
config.initsession()
|
||||
""")
|
||||
|
||||
def test_implied_options():
|
||||
for key, expr in implied_options.items():
|
||||
yield check_implied_option, [key], expr
|
||||
|
||||
def check_implied_option(opts, expr):
|
||||
config = py.test.config._reparse(opts + [datadir/'filetest.py'])
|
||||
session = config.initsession()
|
||||
assert eval(expr, session.config.option.__dict__)
|
||||
|
||||
def test_default_session_options():
|
||||
for opts in ([], ['-l'], ['-s'], ['--tb=no'], ['--tb=short'],
|
||||
|
@ -63,15 +92,7 @@ class TestKeywordSelection:
|
|||
l = session.getitemoutcomepairs(py.test.Item.Skipped)
|
||||
assert l[0][0].name == 'test_1'
|
||||
|
||||
#f = open('/tmp/logfile', 'wa')
|
||||
class TestTerminalSession:
|
||||
|
||||
def setup_class(cls):
|
||||
(datadir / 'syntax_error.py').write("\nthis is really not python\n")
|
||||
|
||||
def teardown_class(cls):
|
||||
(datadir / 'syntax_error.py').remove()
|
||||
|
||||
def mainsession(self, *args):
|
||||
from py.__.test.terminal.terminal import TerminalSession
|
||||
self.file = py.std.StringIO.StringIO()
|
||||
|
@ -291,72 +312,3 @@ class TestTerminalSession:
|
|||
expected_output = '\nE ' + line_to_report + '\n'
|
||||
print 'Looking for:', expected_output
|
||||
assert expected_output in out
|
||||
|
||||
|
||||
class TestRemote:
|
||||
def XXXtest_rootdir_is_package(self):
|
||||
d = tmpdir.ensure('rootdirtest1', dir=1)
|
||||
d.ensure('__init__.py')
|
||||
x1 = d.ensure('subdir', '__init__.py')
|
||||
x2 = d.ensure('subdir2', '__init__.py')
|
||||
x3 = d.ensure('subdir3', 'noinit', '__init__.py')
|
||||
assert getrootdir([x1]) == d
|
||||
assert getrootdir([x2]) == d
|
||||
assert getrootdir([x1,x2]) == d
|
||||
assert getrootdir([x3,x2]) == d
|
||||
assert getrootdir([x2,x3]) == d
|
||||
|
||||
def XXXtest_rootdir_is_not_package(self):
|
||||
one = tmpdir.ensure('rootdirtest1', 'hello')
|
||||
rootdir = getrootdir([one])
|
||||
assert rootdir == one.dirpath()
|
||||
|
||||
def test_exec(self):
|
||||
o = tmpdir.ensure('remote', dir=1)
|
||||
tfile = o.join('test_exec.py')
|
||||
tfile.write(py.code.Source("""
|
||||
def test_1():
|
||||
assert 1 == 0
|
||||
"""))
|
||||
print py.std.sys.executable
|
||||
config = py.test.config._reparse(
|
||||
['--exec=' + py.std.sys.executable,
|
||||
o])
|
||||
cls = config._getsessionclass()
|
||||
out = [] # out = py.std.Queue.Queue()
|
||||
session = cls(config, out.append)
|
||||
session.main()
|
||||
for s in out:
|
||||
if s.find('1 failed') != -1:
|
||||
break
|
||||
else:
|
||||
py.test.fail("did not see test_1 failure")
|
||||
|
||||
def test_looponfailing(self):
|
||||
o = tmpdir.ensure('looponfailing', dir=1)
|
||||
tfile = o.join('test_looponfailing.py')
|
||||
tfile.write(py.code.Source("""
|
||||
def test_1():
|
||||
assert 1 == 0
|
||||
"""))
|
||||
print py.std.sys.executable
|
||||
config = py.test.config._reparse(['--looponfailing', str(o)])
|
||||
cls = config._getsessionclass()
|
||||
out = py.std.Queue.Queue()
|
||||
session = cls(config, out.put)
|
||||
pool = py._thread.WorkerPool()
|
||||
reply = pool.dispatch(session.main)
|
||||
while 1:
|
||||
s = out.get(timeout=1.0)
|
||||
if s.find('1 failed') != -1:
|
||||
break
|
||||
print s
|
||||
else:
|
||||
py.test.fail("did not see test_1 failure")
|
||||
# XXX we would like to have a cleaner way to finish
|
||||
try:
|
||||
reply.get(timeout=0.5)
|
||||
except IOError, e:
|
||||
assert str(e).lower().find('timeout') != -1
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue