[svn r51034] merging in fijal's reporter-merge branch into the trunk,

still needs refactoring as far as i am concernced.

--HG--
branch : trunk
This commit is contained in:
hpk 2008-01-25 16:54:04 +01:00
parent 192a890435
commit f2b0bd10e6
45 changed files with 740 additions and 1059 deletions

View File

@ -11,8 +11,8 @@ version = "0.9.1-alpha"
initpkg(__name__, initpkg(__name__,
description = "pylib and py.test: agile development and test support library", description = "pylib and py.test: agile development and test support library",
revision = int('$LastChangedRevision: 46771 $'.split(':')[1][:-1]), revision = int('$LastChangedRevision: 51034 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2007-09-20 17:20:45 +0200 (Thu, 20 Sep 2007) $', lastchangedate = '$LastChangedDate: 2008-01-25 16:54:04 +0100 (Fri, 25 Jan 2008) $',
version = version, version = version,
url = "http://codespeak.net/py", url = "http://codespeak.net/py",
download_url = "XXX", # "http://codespeak.net/download/py/py-%s.tar.gz" %(version,), download_url = "XXX", # "http://codespeak.net/download/py/py-%s.tar.gz" %(version,),
@ -33,6 +33,7 @@ initpkg(__name__,
'test.exit' : ('./test/session.py', 'exit'), 'test.exit' : ('./test/session.py', 'exit'),
'test.broken' : ('./test/item.py', 'Broken'), 'test.broken' : ('./test/item.py', 'Broken'),
'test.notimplemented' : ('./test/item.py', '_NotImplemented'), 'test.notimplemented' : ('./test/item.py', '_NotImplemented'),
'test.pdb' : ('./test/custompdb.py', 'set_trace'),
# configuration/initialization related test api # configuration/initialization related test api
'test.config' : ('./test/config.py', 'config_per_process'), 'test.config' : ('./test/config.py', 'config_per_process'),

View File

@ -5,6 +5,7 @@
import py import py
from py.__.apigen import apigen from py.__.apigen import apigen
py.test.skip("Apigen functionality temporarily disabled")
def setup_module(mod): def setup_module(mod):
if py.std.sys.platform == "win32": if py.std.sys.platform == "win32":

View File

@ -1,13 +1,11 @@
import py import py
failure_demo = py.magic.autopath().dirpath('failure_demo.py') failure_demo = py.magic.autopath().dirpath('failure_demo.py')
from py.__.test.outcome import Failed, Passed from py.__.doc.test_conftest import countoutcomes
def test_failure_demo_fails_properly(): def test_failure_demo_fails_properly():
config = py.test.config._reparse([failure_demo]) config = py.test.config._reparse([failure_demo])
session = config.initsession() session = config.initsession()
session.main() failed, passed, skipped = countoutcomes(session)
l = session.getitemoutcomepairs(Failed) assert failed == 21
assert len(l) == 21 assert passed == 0
l = session.getitemoutcomepairs(Passed)
assert not l

View File

@ -1,10 +1,26 @@
import py import py
from py.__.test.outcome import Skipped, Failed, Passed from py.__.test import repevent
def setup_module(mod): def setup_module(mod):
mod.tmpdir = py.test.ensuretemp('docdoctest') mod.tmpdir = py.test.ensuretemp('docdoctest')
def countoutcomes(session):
l = []
session.main(l.append)
passed = failed = skipped = 0
for event in l:
if isinstance(event, repevent.ReceivedItemOutcome):
if event.outcome.passed:
passed += 1
elif event.outcome.skipped:
skipped += 1
else:
failed += 1
elif isinstance(event, repevent.FailedTryiter):
failed += 1
return failed, passed, skipped
def test_doctest_extra_exec(): def test_doctest_extra_exec():
# XXX get rid of the next line: # XXX get rid of the next line:
py.magic.autopath().dirpath('conftest.py').copy(tmpdir.join('conftest.py')) py.magic.autopath().dirpath('conftest.py').copy(tmpdir.join('conftest.py'))
@ -16,9 +32,8 @@ def test_doctest_extra_exec():
""")) """))
config = py.test.config._reparse([xtxt]) config = py.test.config._reparse([xtxt])
session = config.initsession() session = config.initsession()
session.main() failed, passed, skipped = countoutcomes(session)
l = session.getitemoutcomepairs(Failed) assert failed == 1
assert len(l) == 1
def test_doctest_basic(): def test_doctest_basic():
# XXX get rid of the next line: # XXX get rid of the next line:
@ -45,12 +60,9 @@ def test_doctest_basic():
""")) """))
config = py.test.config._reparse([xtxt]) config = py.test.config._reparse([xtxt])
session = config.initsession() session = config.initsession()
session.main() failed, passed, skipped = countoutcomes(session)
l = session.getitemoutcomepairs(Failed) assert failed == 0
assert len(l) == 0 assert passed + skipped == 2
l = session.getitemoutcomepairs(Passed)
l2 = session.getitemoutcomepairs(Skipped)
assert len(l+l2) == 2
def test_deindent(): def test_deindent():
from py.__.doc.conftest import deindent from py.__.doc.conftest import deindent
@ -69,12 +81,9 @@ def test_doctest_eol():
ytxt.write(py.code.Source(".. >>> 1 + 1\r\n 2\r\n\r\n")) ytxt.write(py.code.Source(".. >>> 1 + 1\r\n 2\r\n\r\n"))
config = py.test.config._reparse([ytxt]) config = py.test.config._reparse([ytxt])
session = config.initsession() session = config.initsession()
session.main() failed, passed, skipped = countoutcomes(session)
l = session.getitemoutcomepairs(Failed) assert failed == 0
assert len(l) == 0 assert passed + skipped == 2
l = session.getitemoutcomepairs(Passed)
l2 = session.getitemoutcomepairs(Skipped)
assert len(l+l2) == 2
def test_doctest_indentation(): def test_doctest_indentation():
# XXX get rid of the next line: # XXX get rid of the next line:
@ -84,12 +93,9 @@ def test_doctest_indentation():
txt.write('..\n >>> print "foo\\n bar"\n foo\n bar\n') txt.write('..\n >>> print "foo\\n bar"\n foo\n bar\n')
config = py.test.config._reparse([txt]) config = py.test.config._reparse([txt])
session = config.initsession() session = config.initsession()
session.main() failed, passed, skipped = countoutcomes(session)
l = session.getitemoutcomepairs(Failed) assert failed == 0
assert len(l) == 0 assert skipped + passed == 2
l = session.getitemoutcomepairs(Passed)
l2 = session.getitemoutcomepairs(Skipped)
assert len(l+l2) == 2
def test_js_ignore(): def test_js_ignore():
py.magic.autopath().dirpath('conftest.py').copy(tmpdir.join('conftest.py')) py.magic.autopath().dirpath('conftest.py').copy(tmpdir.join('conftest.py'))
@ -102,12 +108,10 @@ def test_js_ignore():
""")) """))
config = py.test.config._reparse([xtxt]) config = py.test.config._reparse([xtxt])
session = config.initsession() session = config.initsession()
session.main()
l = session.getitemoutcomepairs(Failed) failed, passed, skipped = countoutcomes(session)
assert len(l) == 0 assert failed == 0
l = session.getitemoutcomepairs(Passed) assert skipped + passed == 3
l2 = session.getitemoutcomepairs(Skipped)
assert len(l+l2) == 3
def test_resolve_linkrole(): def test_resolve_linkrole():
from py.__.doc.conftest import get_apigen_relpath from py.__.doc.conftest import get_apigen_relpath

View File

@ -1,6 +1,8 @@
import py import py
from py.__.green.greenexecnet import * from py.__.green.greenexecnet import *
py.test.skip("Does not work with globally installed pylib")
def test_simple(): def test_simple():
gw = PopenGateway() gw = PopenGateway()
channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)") channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)")

View File

@ -57,6 +57,7 @@ def test_run_tests():
captureouterr=True) captureouterr=True)
print errors print errors
assert not errors assert not errors
py.test.skip("Apigen turned off")
assert pkgpath.join('../apigen').check(dir=True) assert pkgpath.join('../apigen').check(dir=True)
assert pkgpath.join('../apigen/api/sub.foo.html').check(file=True) assert pkgpath.join('../apigen/api/sub.foo.html').check(file=True)
@ -65,6 +66,7 @@ def test_run_tests_failure():
py.test.skip("update_website is not supposed to be run from win32") py.test.skip("update_website is not supposed to be run from win32")
pkgpath = setup_pkg('update_website_run_tests_failure') pkgpath = setup_pkg('update_website_run_tests_failure')
assert not pkgpath.join('../apigen').check(dir=True) assert not pkgpath.join('../apigen').check(dir=True)
py.test.skip("Apigen turned off")
pkgpath.ensure('../apigen', file=True) pkgpath.ensure('../apigen', file=True)
errors = update_website.run_tests(pkgpath, errors = update_website.run_tests(pkgpath,
pkgpath.dirpath().join('apigen'), pkgpath.dirpath().join('apigen'),

37
py/test/collectonly.py Normal file
View File

@ -0,0 +1,37 @@
""" --collectonly session, not to spread logic all over the place
"""
import py
from py.__.test.session import Session
from py.__.test.reporter import LocalReporter
class CollectReporter(LocalReporter):
def __init__(self, *args, **kwds):
super(LocalReporter, self).__init__(*args, **kwds)
self.indent = 0
def report_ReceivedItemOutcome(self, event):
pass
def report_ItemStart(self, event):
self.out.line(" " * self.indent + str(event.item))
self.indent += 2
def report_ItemFinish(self, event):
self.indent -= 2
def report_FailedTryiter(self, event):
self.out.line(" " * self.indent + "- FAILED TO LOAD MODULE -")
def report_SkippedTryiter(self, event):
self.out.line(" " * self.indent + "- skipped -")
def summary(self):
self.out.sep("=", "Total time: %.1f" % (self.timeend - self.timestart))
class CollectSession(Session):
reporterclass = CollectReporter
def run(self, item):
pass

View File

@ -159,21 +159,13 @@ class Config(object):
def _getsessionname(self): def _getsessionname(self):
""" return default session name as determined from options. """ """ return default session name as determined from options. """
name = 'TerminalSession' name = 'Session'
if self.option.dist: if self.option.dist:
name = 'RSession' name = 'RSession'
elif self.option.collectonly:
name = 'CollectSession'
else: else:
optnames = 'startserver runbrowser apigen restreport boxed'.split() if self.option.looponfailing or self.option.executable:
for opt in optnames:
if getattr(self.option, opt, False):
name = 'LSession'
break
else:
if self.getvalue('dist_boxed'):
name = 'LSession'
if self.option.looponfailing:
name = 'RemoteTerminalSession'
elif self.option.executable:
name = 'RemoteTerminalSession' name = 'RemoteTerminalSession'
return name return name
@ -273,10 +265,11 @@ config_per_process = Config()
# default import paths for sessions # default import paths for sessions
TerminalSession = 'py.__.test.terminal.terminal' Session = 'py.__.test.session'
RemoteTerminalSession = 'py.__.test.terminal.remote' RemoteTerminalSession = 'py.__.test.terminal.remote'
RSession = 'py.__.test.rsession.rsession' RSession = 'py.__.test.rsession.rsession'
LSession = 'py.__.test.rsession.rsession' LSession = 'py.__.test.rsession.rsession'
CollectSession = 'py.__.test.collectonly'
# #
# helpers # helpers

View File

@ -4,7 +4,7 @@
import py, os, sys import py, os, sys
from py.__.test.outcome import SerializableOutcome, ReprOutcome from py.__.test.outcome import SerializableOutcome, ReprOutcome
from py.__.test.rsession.box import Box from py.__.test.box import Box
from py.__.test import repevent from py.__.test import repevent
from py.__.test.outcome import Skipped, Failed from py.__.test.outcome import Skipped, Failed
import py.__.test.custompdb import py.__.test.custompdb
@ -35,8 +35,11 @@ class RunExecutor(object):
try: try:
self.run(capture) self.run(capture)
outcome = SerializableOutcome() outcome = SerializableOutcome()
except Skipped, e: outcome.stdout, outcome.stderr = self.item._getouterr()
outcome = SerializableOutcome(skipped=str(e)) except Skipped:
e = py.code.ExceptionInfo()
outcome = SerializableOutcome(skipped=e)
outcome.stdout, outcome.stderr = self.item._getouterr()
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
raise raise
except: except:
@ -51,6 +54,7 @@ class RunExecutor(object):
excinfo.traceback = excinfo.traceback.cut( excinfo.traceback = excinfo.traceback.cut(
path=code.path, firstlineno=code.firstlineno) path=code.path, firstlineno=code.firstlineno)
outcome = SerializableOutcome(excinfo=excinfo, setupfailure=False) outcome = SerializableOutcome(excinfo=excinfo, setupfailure=False)
outcome.stdout, outcome.stderr = self.item._getouterr()
if self.usepdb: if self.usepdb:
if self.reporter is not None: if self.reporter is not None:
self.reporter(repevent.ImmediateFailure(self.item, self.reporter(repevent.ImmediateFailure(self.item,
@ -60,7 +64,6 @@ class RunExecutor(object):
# XXX hmm, we probably will not like to continue from that # XXX hmm, we probably will not like to continue from that
# point # point
raise SystemExit() raise SystemExit()
outcome.stdout, outcome.stderr = self.item._getouterr()
return outcome return outcome
class ApigenExecutor(RunExecutor): class ApigenExecutor(RunExecutor):
@ -104,7 +107,7 @@ class BoxExecutor(RunExecutor):
return (passed, setupfailure, excinfo, skipped, critical, 0, return (passed, setupfailure, excinfo, skipped, critical, 0,
b.stdoutrepr, b.stderrrepr) b.stdoutrepr, b.stderrrepr)
else: else:
return (False, False, None, False, False, b.signal, return (False, False, None, None, False, b.signal,
b.stdoutrepr, b.stderrrepr) b.stdoutrepr, b.stderrrepr)
class AsyncExecutor(RunExecutor): class AsyncExecutor(RunExecutor):

View File

@ -45,10 +45,9 @@ class SerializableOutcome(object):
self.stderr = "" self.stderr = ""
assert bool(self.passed) + bool(excinfo) + bool(skipped) == 1 assert bool(self.passed) + bool(excinfo) + bool(skipped) == 1
def make_excinfo_repr(self, tbstyle): def make_excinfo_repr(self, excinfo, tbstyle):
if self.excinfo is None: if excinfo is None or isinstance(excinfo, basestring):
return None return excinfo
excinfo = self.excinfo
tb_info = [self.traceback_entry_repr(x, tbstyle) tb_info = [self.traceback_entry_repr(x, tbstyle)
for x in excinfo.traceback] for x in excinfo.traceback]
rec_index = excinfo.traceback.recursionindex() rec_index = excinfo.traceback.recursionindex()
@ -85,8 +84,9 @@ class SerializableOutcome(object):
def make_repr(self, tbstyle="long"): def make_repr(self, tbstyle="long"):
return (self.passed, self.setupfailure, return (self.passed, self.setupfailure,
self.make_excinfo_repr(tbstyle), self.make_excinfo_repr(self.excinfo, tbstyle),
self.skipped, self.is_critical, 0, self.stdout, self.stderr) self.make_excinfo_repr(self.skipped, tbstyle),
self.is_critical, 0, self.stdout, self.stderr)
class TracebackEntryRepr(object): class TracebackEntryRepr(object):
def __init__(self, tbentry): def __init__(self, tbentry):
@ -136,12 +136,15 @@ class ExcInfoRepr(object):
class ReprOutcome(object): class ReprOutcome(object):
def __init__(self, repr_tuple): def __init__(self, repr_tuple):
(self.passed, self.setupfailure, excinfo, self.skipped, (self.passed, self.setupfailure, excinfo, skipped,
self.is_critical, self.signal, self.stdout, self.stderr) = repr_tuple self.is_critical, self.signal, self.stdout, self.stderr) = repr_tuple
if excinfo is None: self.excinfo = self.unpack(excinfo)
self.excinfo = None self.skipped = self.unpack(skipped)
else:
self.excinfo = ExcInfoRepr(excinfo) def unpack(self, what):
if what is None or isinstance(what, basestring):
return what
return ExcInfoRepr(what)
def __repr__(self): def __repr__(self):
l = ["%s=%s" %(x, getattr(self, x)) l = ["%s=%s" %(x, getattr(self, x))

View File

@ -97,11 +97,11 @@ class HostRSyncRootReady(ReportEvent):
self.root = root self.root = root
class TestStarted(ReportEvent): class TestStarted(ReportEvent):
def __init__(self, hosts, topdir, roots): def __init__(self, hosts, config, roots):
self.hosts = hosts self.hosts = hosts
self.topdir = topdir
self.roots = roots self.roots = roots
self.timestart = time.time() self.timestart = time.time()
self.config = config
class TestFinished(ReportEvent): class TestFinished(ReportEvent):
def __init__(self): def __init__(self):
@ -131,6 +131,13 @@ class ItemStart(ReportEvent):
def __init__(self, item): def __init__(self, item):
self.item = item self.item = item
class ItemFinish(ReportEvent):
""" This class shows most of the start stuff, like directory, module, class
can be used for containers
"""
def __init__(self, item):
self.item = item
class RsyncFinished(ReportEvent): class RsyncFinished(ReportEvent):
def __init__(self): def __init__(self):
self.time = time.time() self.time = time.time()

View File

@ -11,11 +11,14 @@ from py.__.test.terminal.out import getout
from py.__.test import repevent from py.__.test import repevent
from py.__.test import outcome from py.__.test import outcome
from py.__.misc.terminal_helper import ansi_print, get_terminal_width from py.__.misc.terminal_helper import ansi_print, get_terminal_width
from py.__.test.representation import Presenter from py.__.test.representation import Presenter, repr_pythonversion,\
getrelpath
import sys import sys
def choose_reporter(config): from time import time as now
def choose_reporter(reporterclass, config):
option = config.option option = config.option
if option.startserver or option.runbrowser: if option.startserver or option.runbrowser:
from py.__.test.rsession.web import WebReporter from py.__.test.rsession.web import WebReporter
@ -24,10 +27,7 @@ def choose_reporter(config):
from py.__.test.rsession.rest import RestReporter from py.__.test.rsession.rest import RestReporter
return RestReporter return RestReporter
else: else:
if option.dist: return reporterclass
return RemoteReporter
else:
return LocalReporter
class TestReporter(object): class TestReporter(object):
""" Simple test reporter which tracks failures """ Simple test reporter which tracks failures
@ -56,9 +56,6 @@ class AbstractReporter(object):
self.skipped_tests_outcome = [] self.skipped_tests_outcome = []
self.out = getout(py.std.sys.stdout) self.out = getout(py.std.sys.stdout)
self.presenter = Presenter(self.out, config) self.presenter = Presenter(self.out, config)
self.failed = dict([(host, 0) for host in hosts])
self.skipped = dict([(host, 0) for host in hosts])
self.passed = dict([(host, 0) for host in hosts])
self.to_rsync = {} self.to_rsync = {}
def get_item_name(self, event, colitem): def get_item_name(self, event, colitem):
@ -79,7 +76,7 @@ class AbstractReporter(object):
print excinfo print excinfo
# XXX reenable test before removing below line and # XXX reenable test before removing below line and
# run it with raise # run it with raise
#raise raise
__call__ = report __call__ = report
@ -130,12 +127,13 @@ class AbstractReporter(object):
print "%15s: READY" % hostrepr print "%15s: READY" % hostrepr
def report_TestStarted(self, item): def report_TestStarted(self, item):
topdir = item.config.topdir
hostreprs = [self._hostrepr(host) for host in item.hosts] hostreprs = [self._hostrepr(host) for host in item.hosts]
txt = " Test started, hosts: %s " % ", ".join(hostreprs) txt = " Test started, hosts: %s " % ", ".join(hostreprs)
self.hosts_to_rsync = len(item.hosts) self.hosts_to_rsync = len(item.hosts)
self.out.sep("=", txt) self.out.sep("=", txt)
self.timestart = item.timestart self.timestart = item.timestart
self.out.write("local top directory: %s\n" % item.topdir) self.out.write("local top directory: %s\n" % topdir)
for i, root in py.builtin.enumerate(item.roots): for i, root in py.builtin.enumerate(item.roots):
outof = "%d/%d" %(i+1, len(item.roots)) outof = "%d/%d" %(i+1, len(item.roots))
self.out.write("local RSync root [%s]: %s\n" % self.out.write("local RSync root [%s]: %s\n" %
@ -145,6 +143,7 @@ class AbstractReporter(object):
self.timersync = item.time self.timersync = item.time
def report_ImmediateFailure(self, event): def report_ImmediateFailure(self, event):
self.out.line()
self.repr_failure(event.item, event.outcome) self.repr_failure(event.item, event.outcome)
def report_TestFinished(self, item): def report_TestFinished(self, item):
@ -227,8 +226,8 @@ class AbstractReporter(object):
colitem = event.item colitem = event.item
if isinstance(event, repevent.ReceivedItemOutcome): if isinstance(event, repevent.ReceivedItemOutcome):
outcome = event.outcome outcome = event.outcome
text = outcome.skipped text = outcome.skipped.value
itemname = self.get_item_name(event, colitem) itemname = repr(outcome.skipped.traceback[-2]).split("\n")[0]
elif isinstance(event, repevent.SkippedTryiter): elif isinstance(event, repevent.SkippedTryiter):
text = str(event.excinfo.value) text = str(event.excinfo.value)
itemname = "/".join(colitem.listnames()) itemname = "/".join(colitem.listnames())
@ -243,10 +242,14 @@ class AbstractReporter(object):
for text, items in texts.items(): for text, items in texts.items():
for item in items: for item in items:
self.out.line('Skipped in %s' % item) self.out.line('Skipped in %s' % item)
self.out.line("reason: %s" % text) self.out.line("reason: %s" % text[1:-1])
self.out.line()
def summary(self): def summary(self):
def gather(dic): def gather(dic):
# XXX hack to handle dicts & ints here, get rid of it
if isinstance(dic, int):
return dic
total = 0 total = 0
for key, val in dic.iteritems(): for key, val in dic.iteritems():
total += val total += val
@ -263,6 +266,7 @@ class AbstractReporter(object):
total = total_passed + total_failed + total_skipped total = total_passed + total_failed + total_skipped
skipped_str = create_str("skipped", total_skipped) skipped_str = create_str("skipped", total_skipped)
failed_str = create_str("failed", total_failed) failed_str = create_str("failed", total_failed)
self.out.line()
self.print_summary(total, skipped_str, failed_str) self.print_summary(total, skipped_str, failed_str)
def print_summary(self, total, skipped_str, failed_str): def print_summary(self, total, skipped_str, failed_str):
@ -314,6 +318,12 @@ class AbstractReporter(object):
return sum(self.failed.values()) > 0 return sum(self.failed.values()) > 0
class RemoteReporter(AbstractReporter): class RemoteReporter(AbstractReporter):
def __init__(self, config, hosts):
super(RemoteReporter, self).__init__(config, hosts)
self.failed = dict([(host, 0) for host in hosts])
self.skipped = dict([(host, 0) for host in hosts])
self.passed = dict([(host, 0) for host in hosts])
def get_item_name(self, event, colitem): def get_item_name(self, event, colitem):
return event.host.hostname + ":" + \ return event.host.hostname + ":" + \
"/".join(colitem.listnames()) "/".join(colitem.listnames())
@ -329,9 +339,40 @@ class RemoteReporter(AbstractReporter):
join(event.item.listnames()))) join(event.item.listnames())))
class LocalReporter(AbstractReporter): class LocalReporter(AbstractReporter):
def __init__(self, config, hosts=None):
assert not hosts
super(LocalReporter, self).__init__(config, hosts)
self.failed = 0
self.skipped = 0
self.passed = 0
def report_TestStarted(self, item):
colitems = item.config.getcolitems()
txt = " test process starts "
self.out.sep("=", txt)
self.timestart = item.timestart
self.out.line("executable: %s (%s)" %
(py.std.sys.executable, repr_pythonversion()))
rev = py.__package__.getrev()
self.out.line("using py lib: %s <rev %s>" % (
py.path.local(py.__file__).dirpath(), rev))
config = item.config
if config.option.traceconfig or config.option.verbose:
for x in colitems:
self.out.line("test target: %s" %(x.fspath,))
conftestmodules = config._conftest.getconftestmodules(None)
for i,x in py.builtin.enumerate(conftestmodules):
self.out.line("initial conf %d: %s" %(i, x.__file__))
def get_item_name(self, event, colitem): def get_item_name(self, event, colitem):
return "/".join(colitem.listnames()) return "/".join(colitem.listnames())
def print_summary(self, total, skipped_str, failed_str):
self.out.sep("=", " %d test run%s%s in %.2fs" %
(total, skipped_str, failed_str, self.timeend - self.timestart))
def report_SkippedTryiter(self, event): def report_SkippedTryiter(self, event):
#self.show_item(event.item, False) #self.show_item(event.item, False)
if isinstance(event.item, py.test.collect.Module): if isinstance(event.item, py.test.collect.Module):
@ -344,44 +385,52 @@ class LocalReporter(AbstractReporter):
#self.show_item(event.item, False) #self.show_item(event.item, False)
self.out.write("- FAILED TO LOAD MODULE") self.out.write("- FAILED TO LOAD MODULE")
self.failed_tests_outcome.append(event) self.failed_tests_outcome.append(event)
self.failed[self.hosts[0]] += 1 self.failed += 1
def report_ReceivedItemOutcome(self, event): def report_ReceivedItemOutcome(self, event):
host = self.hosts[0]
if event.outcome.passed: if event.outcome.passed:
self.passed[host] += 1 self.passed += 1
self.out.write(".") self.out.write(".")
elif event.outcome.skipped: elif event.outcome.skipped:
self.skipped_tests_outcome.append(event) self.skipped_tests_outcome.append(event)
self.skipped[host] += 1 self.skipped += 1
self.out.write("s") self.out.write("s")
else: else:
self.failed[host] += 1 self.failed += 1
self.failed_tests_outcome.append(event) self.failed_tests_outcome.append(event)
self.out.write("F") self.out.write("F")
def report_ItemStart(self, event): def report_ItemStart(self, event):
# XXX
event.item.start = now()
self.show_item(event.item) self.show_item(event.item)
def show_item(self, item, count_elems = True): def show_item(self, item, count_elems = True):
if isinstance(item, py.test.collect.Module): if isinstance(item, py.test.collect.Module):
# XXX This is a terrible hack, I don't like it self.show_Module(item)
# and will rewrite it at some point if self.config.option.verbose > 0 and\
#self.count = 0 isinstance(item, py.test.collect.Item):
lgt = len(list(item._tryiter())) self.show_ItemVerbose(item)
#self.lgt = lgt
# print names relative to current workdir def show_ItemVerbose(self, item):
name = "/".join(item.listnames()) realpath, lineno = item._getpathlineno()
local = str(py.path.local()) location = "%s:%d" % (realpath.basename, lineno+1)
d = str(self.config.topdir) self.out.write("%-20s %s " % (location, item._getmodpath()))
if local.startswith(d):
local = local[len(d) + 1:] def show_Module(self, mod):
if local and name.startswith(local): lgt = len(list(mod._tryiter()))
name = name[len(local) + 1:] if self.config.option.verbose == 0:
self.out.write("\n%s[%d] " % (name, lgt)) base = getrelpath(py.path.local(), mod.fspath)
self.out.write("\n%s[%d] " % (base, lgt))
else:
self.out.line()
self.out.line('+ testmodule: %s[%d]' % (mod.fspath, lgt))
def gethost(self, event): def gethost(self, event):
return 'localhost' return 'localhost'
def hangs(self): def hangs(self):
pass pass
def was_failure(self):
return self.failed > 0

View File

@ -8,6 +8,17 @@ to allow further use outside the pylib
import py import py
from py.__.code import safe_repr from py.__.code import safe_repr
def getrelpath(source, dest):
base = source.common(dest)
if not base:
return None
# with posix local paths '/' is always a common base
relsource = source.relto(base)
reldest = dest.relto(base)
n = relsource.count(source.sep)
target = dest.sep.join(('..', )*n + (reldest, ))
return target
class Presenter(object): class Presenter(object):
""" Class used for presentation of various objects, """ Class used for presentation of various objects,
sharing common output style sharing common output style
@ -176,3 +187,11 @@ class Presenter(object):
# the following is only used by the combination '--pdb --tb=no' # the following is only used by the combination '--pdb --tb=no'
repr_failure_tbno = repr_failure_tbshort repr_failure_tbno = repr_failure_tbshort
def repr_pythonversion():
v = py.std.sys.version_info
try:
return "%s.%s.%s-%s-%s" % v
except ValueError:
return str(v)

View File

@ -3,7 +3,7 @@
""" """
import py import py
from py.__.test.rsession.executor import BoxExecutor, RunExecutor,\ from py.__.test.executor import BoxExecutor, RunExecutor,\
ApigenExecutor ApigenExecutor
from py.__.test import repevent from py.__.test import repevent
from py.__.test.outcome import ReprOutcome from py.__.test.outcome import ReprOutcome

View File

@ -41,31 +41,6 @@ class MasterNode(object):
# of hanging nodes and such # of hanging nodes and such
raise raise
def itemgen(colitems, reporter, keyword=None):
stopitems = py.test.collect.Item # XXX should be generator here as well
for next in colitems:
if isinstance(next, stopitems):
try:
next._skipbykeyword(keyword)
yield next
except Skipped:
excinfo = py.code.ExceptionInfo()
reporter(repevent.SkippedTryiter(excinfo, next))
else:
reporter(repevent.ItemStart(next))
try:
for x in itemgen([next.join(x) for x in next.run()], reporter,
keyword):
yield x
except (KeyboardInterrupt, SystemExit, GeneratorExit):
raise
except:
excinfo = py.code.ExceptionInfo()
if excinfo.type is Skipped:
reporter(repevent.SkippedTryiter(excinfo, next))
else:
reporter(repevent.FailedTryiter(excinfo, next))
def dispatch_loop(masternodes, itemgenerator, shouldstop, def dispatch_loop(masternodes, itemgenerator, shouldstop,
waiter = lambda: py.std.time.sleep(0.1), waiter = lambda: py.std.time.sleep(0.1),
max_tasks_per_node=None): max_tasks_per_node=None):

View File

@ -12,10 +12,13 @@ from py.__.rest.rst import *
class RestReporter(AbstractReporter): class RestReporter(AbstractReporter):
linkwriter = None linkwriter = None
def __init__(self, *args, **kwargs): def __init__(self, config, hosts):
super(RestReporter, self).__init__(*args, **kwargs) super(RestReporter, self).__init__(config, hosts)
self.rest = Rest() self.rest = Rest()
self.traceback_num = 0 self.traceback_num = 0
self.failed = dict([(host, 0) for host in hosts])
self.skipped = dict([(host, 0) for host in hosts])
self.passed = dict([(host, 0) for host in hosts])
def get_linkwriter(self): def get_linkwriter(self):
if self.linkwriter is None: if self.linkwriter is None:

View File

@ -9,17 +9,19 @@ import re
import time import time
from py.__.test import repevent from py.__.test import repevent
from py.__.test.rsession.master import MasterNode, dispatch_loop, itemgen from py.__.test.rsession.master import MasterNode, dispatch_loop
from py.__.test.rsession.hostmanage import HostInfo, HostManager from py.__.test.rsession.hostmanage import HostInfo, HostManager
from py.__.test.rsession.local import local_loop, plain_runner, apigen_runner,\ from py.__.test.rsession.local import local_loop, plain_runner, apigen_runner,\
box_runner box_runner
from py.__.test.reporter import LocalReporter, RemoteReporter, TestReporter from py.__.test.reporter import LocalReporter, RemoteReporter, TestReporter
from py.__.test.session import AbstractSession from py.__.test.session import AbstractSession, itemgen
from py.__.test.outcome import Skipped, Failed from py.__.test.outcome import Skipped, Failed
class RSession(AbstractSession): class RSession(AbstractSession):
""" Remote version of session """ Remote version of session
""" """
reporterclass = RemoteReporter
def fixoptions(self): def fixoptions(self):
super(RSession, self).fixoptions() super(RSession, self).fixoptions()
option = self.config.option option = self.config.option
@ -42,13 +44,15 @@ class RSession(AbstractSession):
raise SystemExit raise SystemExit
def main(self, reporter=None): def main(self, reporter=None):
""" main loop for running tests. """ """ main loop for running tests. """
config = self.config config = self.config
hm = HostManager(config) hm = HostManager(config)
reporter, checkfun = self.init_reporter(reporter, config, hm.hosts) reporter, checkfun = self.init_reporter(reporter, config, hm.hosts)
reporter(repevent.TestStarted(hm.hosts, self.config.topdir, reporter(repevent.TestStarted(hm.hosts, self.config,
hm.roots)) hm.roots))
self.reporter = reporter
try: try:
nodes = hm.setup_hosts(reporter) nodes = hm.setup_hosts(reporter)
@ -79,75 +83,5 @@ class RSession(AbstractSession):
def dispatch_tests(self, nodes, reporter, checkfun): def dispatch_tests(self, nodes, reporter, checkfun):
colitems = self.config.getcolitems() colitems = self.config.getcolitems()
keyword = self.config.option.keyword keyword = self.config.option.keyword
itemgenerator = itemgen(colitems, reporter, keyword) itemgenerator = itemgen(self, colitems, reporter, keyword)
all_tests = dispatch_loop(nodes, itemgenerator, checkfun) all_tests = dispatch_loop(nodes, itemgenerator, checkfun)
class LSession(AbstractSession):
""" Local version of session
"""
def main(self, reporter=None, runner=None):
# check out if used options makes any sense
config = self.config
hm = HostManager(config, hosts=[HostInfo('localhost')])
hosts = hm.hosts
if not self.config.option.nomagic:
py.magic.invoke(assertion=1)
reporter, checkfun = self.init_reporter(reporter, config, hosts)
reporter(repevent.TestStarted(hosts, self.config.topdir, []))
colitems = self.config.getcolitems()
reporter(repevent.RsyncFinished())
if runner is None:
runner = self.init_runner()
keyword = self.config.option.keyword
itemgenerator = itemgen(colitems, reporter, keyword)
local_loop(self, reporter, itemgenerator, checkfun, self.config, runner=runner)
retval = reporter(repevent.TestFinished())
if not self.config.option.nomagic:
py.magic.revoke(assertion=1)
self.write_docs()
return retval
def write_docs(self):
if self.config.option.apigen:
from py.__.apigen.tracer.docstorage import DocStorageAccessor
apigen = py.path.local(self.config.option.apigen).pyimport()
if not hasattr(apigen, 'build'):
raise NotImplementedError("%s does not contain 'build' "
"function" %(apigen,))
print >>sys.stderr, 'building documentation'
capture = py.io.StdCaptureFD()
try:
pkgdir = py.path.local(self.config.args[0]).pypkgpath()
apigen.build(pkgdir,
DocStorageAccessor(self.docstorage),
capture)
finally:
capture.reset()
print >>sys.stderr, '\ndone'
def init_runner(self):
if self.config.option.apigen:
from py.__.apigen.tracer.tracer import Tracer, DocStorage
pkgdir = py.path.local(self.config.args[0]).pypkgpath()
apigen = py.path.local(self.config.option.apigen).pyimport()
if not hasattr(apigen, 'get_documentable_items'):
raise NotImplementedError("Provided script does not seem "
"to contain get_documentable_items")
pkgname, items = apigen.get_documentable_items(pkgdir)
self.docstorage = DocStorage().from_dict(items,
module_name=pkgname)
self.tracer = Tracer(self.docstorage)
return apigen_runner
elif self.config.option.boxed:
return box_runner
else:
return plain_runner

View File

@ -3,7 +3,7 @@ Node code for slaves.
""" """
import py import py
from py.__.test.rsession.executor import RunExecutor, BoxExecutor, AsyncExecutor from py.__.test.executor import RunExecutor, BoxExecutor, AsyncExecutor
from py.__.test.outcome import SerializableOutcome from py.__.test.outcome import SerializableOutcome
from py.__.test.outcome import Skipped from py.__.test.outcome import Skipped
import thread import thread

View File

@ -38,6 +38,11 @@ class BasicRsessionTest(object):
testonepath.write(source) testonepath.write(source)
cls.config = py.test.config._reparse([tmpdir]) cls.config = py.test.config._reparse([tmpdir])
cls.collector_test_one = cls.config._getcollector(testonepath) cls.collector_test_one = cls.config._getcollector(testonepath)
cls.doctest = tmpdir.ensure("xxx.txt").write(py.code.Source("""
Aha!!!!!!
=========
"""))
def getexample(self, name): def getexample(self, name):
funcname = "func" + name funcname = "func" + name
@ -45,5 +50,8 @@ class BasicRsessionTest(object):
assert col is not None, funcname assert col is not None, funcname
return col return col
def getdocexample(self):
return self.doctest
def getmod(self): def getmod(self):
return self.collector_test_one return self.collector_test_one

View File

@ -96,7 +96,7 @@ def test_sending_two_noes():
assert len(reportlist) == 4 assert len(reportlist) == 4
def test_outcome_repr(): def test_outcome_repr():
out = ReprOutcome(SerializableOutcome(skipped=True).make_repr()) out = ReprOutcome(SerializableOutcome(skipped="xxx").make_repr())
s = repr(out) s = repr(out)
assert s.lower().find("skip") != -1 assert s.lower().find("skip") != -1
@ -136,21 +136,6 @@ class TestSlave:
item = self.rootcol._getitembynames(names) item = self.rootcol._getitembynames(names)
return self.config.get_collector_trail(item) return self.config.get_collector_trail(item)
def test_slave_setup(self):
py.test.skip("Doesn't work anymore")
pkgname = self.pkgpath.basename
host = HostInfo("localhost:%s" %(self.tmpdir,))
host.initgateway()
channel = setup_slave(host, self.config)
spec = self._gettrail(pkgname, "test_something.py", "funcpass")
print "sending", spec
channel.send(spec)
output = ReprOutcome(channel.receive())
assert output.passed
channel.send(42)
channel.waitclose(10)
host.gw.exit()
def test_slave_running(self): def test_slave_running(self):
py.test.skip("XXX test broken, needs refactoring") py.test.skip("XXX test broken, needs refactoring")
def simple_report(event): def simple_report(event):
@ -164,15 +149,17 @@ class TestSlave:
def open_gw(): def open_gw():
gw = py.execnet.PopenGateway() gw = py.execnet.PopenGateway()
gw.host = HostInfo("localhost") host = HostInfo("localhost")
gw.host.gw = gw host.gw_remotepath = ''
host.gw = gw
#gw.host.gw = gw
config = py.test.config._reparse([tmpdir]) config = py.test.config._reparse([tmpdir])
channel = setup_slave(gw.host, config) channel = setup_slave(host, config)
mn = MasterNode(channel, simple_report, {}) mn = MasterNode(channel, simple_report)
return mn return mn
master_nodes = [open_gw(), open_gw(), open_gw()] master_nodes = [open_gw(), open_gw(), open_gw()]
funcpass_item = rootcol._getitembynames(funcpass_spec) funcpass_item = self.xxx
funcfail_item = rootcol._getitembynames(funcfail_spec) funcfail_item = rootcol._getitembynames(funcfail_spec)
itemgenerator = iter([funcfail_item] + itemgenerator = iter([funcfail_item] +
[funcpass_item] * 5 + [funcfail_item] * 5) [funcpass_item] * 5 + [funcfail_item] * 5)

View File

@ -332,6 +332,9 @@ FooError
class TestRestReporter(AbstractTestReporter): class TestRestReporter(AbstractTestReporter):
reporter = RestReporter reporter = RestReporter
def get_hosts(self):
return [HostInfo('localhost')]
def test_failed_to_load(self): def test_failed_to_load(self):
py.test.skip("Not implemented") py.test.skip("Not implemented")

View File

@ -34,9 +34,9 @@ class TestRSessionRemote(DirSetup, BasicRsessionTest):
pass pass
""")) """))
config = py.test.config._reparse([self.source.join("sub"), '-x']) config = py.test.config._reparse([self.source.join("sub"), '-x'])
rsession = RSession(config)
allevents = [] allevents = []
rsession.main(reporter=allevents.append) rsession = RSession(config)
rsession.main(allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) == 3 assert len(testevents) == 3
@ -69,9 +69,9 @@ class TestRSessionRemote(DirSetup, BasicRsessionTest):
config = py.test.config._reparse([tmpdir.join(subdir)]) config = py.test.config._reparse([tmpdir.join(subdir)])
assert config.topdir == tmpdir assert config.topdir == tmpdir
assert not tmpdir.join("__init__.py").check() assert not tmpdir.join("__init__.py").check()
rsession = RSession(config)
allevents = [] allevents = []
rsession.main(reporter=allevents.append) rsession = RSession(config)
rsession.main(allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) assert len(testevents)
@ -129,7 +129,7 @@ class TestRSessionRemote(DirSetup, BasicRsessionTest):
hm = HostManager(self.config, hosts=hosts) hm = HostManager(self.config, hosts=hosts)
nodes = hm.setup_hosts(allevents.append) nodes = hm.setup_hosts(allevents.append)
from py.__.test.rsession.testing.test_executor \ from py.__.test.testing.test_executor \
import ItemTestPassing, ItemTestFailing, ItemTestSkipping import ItemTestPassing, ItemTestFailing, ItemTestSkipping
itempass = self.getexample("pass") itempass = self.getexample("pass")
@ -177,8 +177,7 @@ class TestRSessionRemote(DirSetup, BasicRsessionTest):
config = py.test.config._reparse([tmpdir]) config = py.test.config._reparse([tmpdir])
rsession = RSession(config) rsession = RSession(config)
allevents = [] rsession.main(allevents.append)
rsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
passevents = [x for x in testevents if x.outcome.passed] passevents = [x for x in testevents if x.outcome.passed]

View File

@ -13,7 +13,7 @@ if sys.platform == 'win32':
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
from py.__.test.rsession.executor import RunExecutor from py.__.test.executor import RunExecutor
class TestSlave(BasicRsessionTest): class TestSlave(BasicRsessionTest):
def gettestnode(self): def gettestnode(self):
@ -69,7 +69,5 @@ class TestSlave(BasicRsessionTest):
assert outcome.excinfo assert outcome.excinfo
def test_slave_run_different_stuff(self): def test_slave_run_different_stuff(self):
py.test.skip("XXX not this way")
node = self.gettestnode() node = self.gettestnode()
node.run(self.rootcol._getitembynames("py doc log.txt".split()). node.run(self.getdocexample())
_get_collector_trail())

View File

@ -218,7 +218,10 @@ class ExportedMethods(BasicExternal):
args['fullmodulename'] = str(mod_fullname) args['fullmodulename'] = str(mod_fullname)
fullitemname = args['fullitemname'] fullitemname = args['fullitemname']
if outcome.skipped: if outcome.skipped:
self.skip_reasons[fullitemname] = outcome.skipped self.skip_reasons[fullitemname] = self.repr_failure_tblong(
event.item,
outcome.skipped,
outcome.skipped.traceback)
elif outcome.excinfo: elif outcome.excinfo:
self.fail_reasons[fullitemname] = self.repr_failure_tblong( self.fail_reasons[fullitemname] = self.repr_failure_tblong(
event.item, outcome.excinfo, outcome.excinfo.traceback) event.item, outcome.excinfo, outcome.excinfo.traceback)
@ -309,6 +312,9 @@ class ExportedMethods(BasicExternal):
# XXX: It overrides our self.hosts # XXX: It overrides our self.hosts
self.hosts = {} self.hosts = {}
self.ready_hosts = {} self.ready_hosts = {}
if not event.hosts:
self.hosts = []
else:
for host in event.hosts: for host in event.hosts:
self.hosts[host] = host self.hosts[host] = host
self.ready_hosts[host] = False self.ready_hosts[host] = False
@ -425,6 +431,9 @@ class WebReporter(object):
def __init__(self, config, hosts): def __init__(self, config, hosts):
start_server_from_config(config) start_server_from_config(config)
def was_failure(self):
return sum(exported_methods.fail_reasons.values()) > 0
# rebind # rebind
report = exported_methods.report report = exported_methods.report
__call__ = report __call__ = report

View File

@ -1,12 +1,64 @@
import py import py
import sys
from py.__.test.outcome import Outcome, Failed, Passed, Skipped from py.__.test.outcome import Outcome, Failed, Passed, Skipped
from py.__.test.reporter import choose_reporter, TestReporter from py.__.test.reporter import choose_reporter, TestReporter
from py.__.test import repevent
from py.__.test.outcome import SerializableOutcome, ReprOutcome
from py.__.test.reporter import LocalReporter
from py.__.test.executor import RunExecutor, BoxExecutor
""" The session implementation - reporter version:
* itemgen is responsible for iterating and telling reporter
about skipped and failed iterations (this is for collectors only),
this should be probably moved to session (for uniformity)
* session gets items which needs to be executed one after another
and tells reporter about that
"""
try:
GeneratorExit
except NameError:
GeneratorExit = StopIteration # I think
def itemgen(session, colitems, reporter, keyword=None):
stopitems = py.test.collect.Item # XXX should be generator here as well
while 1:
if not colitems:
break
next = colitems.pop(0)
if reporter:
reporter(repevent.ItemStart(next))
if isinstance(next, stopitems):
try:
next._skipbykeyword(keyword)
yield next
except Skipped:
if session.config.option.keyword_oneshot:
keyword = None
excinfo = py.code.ExceptionInfo()
reporter(repevent.SkippedTryiter(excinfo, next))
else:
try:
cols = [next.join(x) for x in next.run()]
for x in itemgen(session, cols, reporter, keyword):
yield x
except (KeyboardInterrupt, SystemExit, GeneratorExit):
raise
except:
excinfo = py.code.ExceptionInfo()
if excinfo.type is Skipped:
reporter(repevent.SkippedTryiter(excinfo, next))
else:
reporter(repevent.FailedTryiter(excinfo, next))
if reporter:
reporter(repevent.ItemFinish(next))
class AbstractSession(object): class AbstractSession(object):
""" An abstract session executes collectors/items through a runner. """ An abstract session executes collectors/items through a runner.
""" """
def __init__(self, config): def __init__(self, config):
self._memo = []
self.config = config self.config = config
self._keyword = config.option.keyword self._keyword = config.option.keyword
@ -18,10 +70,6 @@ class AbstractSession(object):
option.startserver = True option.startserver = True
if self.config.getvalue("dist_boxed") and option.dist: if self.config.getvalue("dist_boxed") and option.dist:
option.boxed = True option.boxed = True
# implied options
if option.usepdb:
if not option.nocapture:
option.nocapture = True
# conflicting options # conflicting options
if option.looponfailing and option.usepdb: if option.looponfailing and option.usepdb:
raise ValueError, "--looponfailing together with --pdb not supported." raise ValueError, "--looponfailing together with --pdb not supported."
@ -35,7 +83,8 @@ class AbstractSession(object):
def init_reporter(self, reporter, config, hosts): def init_reporter(self, reporter, config, hosts):
if reporter is None: if reporter is None:
reporter = choose_reporter(config)(config, hosts) reporter = choose_reporter(self.reporterclass, config)\
(config, hosts)
else: else:
reporter = TestReporter(reporter) reporter = TestReporter(reporter)
checkfun = lambda : self.config.option.exitfirst and \ checkfun = lambda : self.config.option.exitfirst and \
@ -47,11 +96,15 @@ class Session(AbstractSession):
A Session gets test Items from Collectors, executes the A Session gets test Items from Collectors, executes the
Items and sends the Outcome to the Reporter. Items and sends the Outcome to the Reporter.
""" """
reporterclass = LocalReporter
def shouldclose(self): def shouldclose(self):
return False return False
def header(self, colitems): def header(self, colitems):
""" setup any neccessary resources ahead of the test run. """ """ setup any neccessary resources ahead of the test run. """
self.reporter(repevent.TestStarted(None, self.config,
None))
if not self.config.option.nomagic: if not self.config.option.nomagic:
py.magic.invoke(assertion=1) py.magic.invoke(assertion=1)
@ -60,91 +113,46 @@ class Session(AbstractSession):
py.test.collect.Function._state.teardown_all() py.test.collect.Function._state.teardown_all()
if not self.config.option.nomagic: if not self.config.option.nomagic:
py.magic.revoke(assertion=1) py.magic.revoke(assertion=1)
self.reporter(repevent.TestFinished())
def start(self, colitem): def main(self, reporter=None):
""" hook invoked before each colitem.run() invocation. """
def finish(self, colitem, outcome):
""" hook invoked after each colitem.run() invocation. """
self._memo.append((colitem, outcome))
def startiteration(self, colitem, subitems):
pass
def getitemoutcomepairs(self, cls):
return [x for x in self._memo if isinstance(x[1], cls)]
def main(self):
""" main loop for running tests. """ """ main loop for running tests. """
config = self.config
self.reporter, shouldstop = self.init_reporter(reporter, config, None)
colitems = self.config.getcolitems() colitems = self.config.getcolitems()
try:
self.header(colitems) self.header(colitems)
keyword = self.config.option.keyword
reporter = self.reporter
itemgenerator = itemgen(self, colitems, reporter, keyword)
failures = []
try: try:
while 1:
try: try:
for colitem in colitems: item = itemgenerator.next()
self.runtraced(colitem) if shouldstop():
except KeyboardInterrupt: return
raise outcome = self.run(item)
if outcome is not None:
if not outcome.passed and not outcome.skipped:
failures.append((item, outcome))
reporter(repevent.ReceivedItemOutcome(None, item, outcome))
except StopIteration:
break
finally: finally:
self.footer(colitems) self.footer(colitems)
except Exit, ex: return failures
pass
return self.getitemoutcomepairs(Failed) return self.getitemoutcomepairs(Failed)
def runtraced(self, colitem): def run(self, item):
if self.shouldclose(): if not self.config.option.boxed:
raise Exit, "received external close signal" executor = RunExecutor(item, self.config.option.usepdb,
self.reporter, self.config)
outcome = None return ReprOutcome(executor.execute().make_repr())
colitem.startcapture()
try:
self.start(colitem)
try:
try:
if colitem._stickyfailure:
raise colitem._stickyfailure
outcome = self.run(colitem)
except (KeyboardInterrupt, Exit):
raise
except Outcome, outcome:
if outcome.excinfo is None:
outcome.excinfo = py.code.ExceptionInfo()
except:
excinfo = py.code.ExceptionInfo()
outcome = Failed(excinfo=excinfo)
assert (outcome is None or
isinstance(outcome, (list, Outcome)))
finally:
self.finish(colitem, outcome)
if isinstance(outcome, Failed) and self.config.option.exitfirst:
py.test.exit("exit on first problem configured.", item=colitem)
finally:
colitem.finishcapture()
def run(self, colitem):
if self.config.option.collectonly and isinstance(colitem, py.test.collect.Item):
return
if isinstance(colitem, py.test.collect.Item):
colitem._skipbykeyword(self._keyword)
if self.config.option.keyword_oneshot:
self._keyword = ""
res = colitem.run()
if res is None:
return Passed()
elif not isinstance(res, (list, tuple)):
raise TypeError("%r.run() returned neither "
"list, tuple nor None: %r" % (colitem, res))
else: else:
finish = self.startiteration(colitem, res) executor = BoxExecutor(item, self.config.option.usepdb,
try: self.reporter, self.config)
for name in res: return ReprOutcome(executor.execute())
obj = colitem.join(name)
assert obj is not None
self.runtraced(obj)
finally:
if finish:
finish()
return res
class Exit(Exception): class Exit(Exception):
""" for immediate program exits without tracebacks and reporter/summary. """ """ for immediate program exits without tracebacks and reporter/summary. """

View File

@ -1,165 +0,0 @@
import py
import sys
from py.__.test.outcome import Outcome, Failed, Passed, Skipped
from py.__.test.reporter import choose_reporter, TestReporter
from py.__.test import repevent
from py.__.test.outcome import SerializableOutcome, ReprOutcome
from py.__.test.reporter import LocalReporter
from py.__.test.executor import RunExecutor, BoxExecutor
""" The session implementation - reporter version:
* itemgen is responsible for iterating and telling reporter
about skipped and failed iterations (this is for collectors only),
this should be probably moved to session (for uniformity)
* session gets items which needs to be executed one after another
and tells reporter about that
"""
try:
GeneratorExit
except NameError:
GeneratorExit = StopIteration # I think
def itemgen(session, colitems, reporter, keyword=None):
stopitems = py.test.collect.Item # XXX should be generator here as well
while 1:
if not colitems:
break
next = colitems.pop(0)
if reporter:
reporter(repevent.ItemStart(next))
if isinstance(next, stopitems):
try:
next._skipbykeyword(keyword)
yield next
except Skipped:
if session.config.option.keyword_oneshot:
keyword = None
excinfo = py.code.ExceptionInfo()
reporter(repevent.SkippedTryiter(excinfo, next))
else:
try:
cols = [next.join(x) for x in next.run()]
for x in itemgen(session, cols, reporter, keyword):
yield x
except (KeyboardInterrupt, SystemExit, GeneratorExit):
raise
except:
excinfo = py.code.ExceptionInfo()
if excinfo.type is Skipped:
reporter(repevent.SkippedTryiter(excinfo, next))
else:
reporter(repevent.FailedTryiter(excinfo, next))
if reporter:
reporter(repevent.ItemFinish(next))
class AbstractSession(object):
""" An abstract session executes collectors/items through a runner.
"""
def __init__(self, config):
self.config = config
self._keyword = config.option.keyword
def fixoptions(self):
""" check, fix and determine conflicting options. """
option = self.config.option
if option.runbrowser and not option.startserver:
#print "--runbrowser implies --startserver"
option.startserver = True
if self.config.getvalue("dist_boxed") and option.dist:
option.boxed = True
# conflicting options
if option.looponfailing and option.usepdb:
raise ValueError, "--looponfailing together with --pdb not supported."
if option.looponfailing and option.dist:
raise ValueError, "--looponfailing together with --dist not supported."
if option.executable and option.usepdb:
raise ValueError, "--exec together with --pdb not supported."
if option.keyword_oneshot and not option.keyword:
raise ValueError, "--keyword-oneshot makes sense only when --keyword is supplied"
def init_reporter(self, reporter, config, hosts):
if reporter is None:
reporter = choose_reporter(self.reporterclass, config)\
(config, hosts)
else:
reporter = TestReporter(reporter)
checkfun = lambda : self.config.option.exitfirst and \
reporter.was_failure()
return reporter, checkfun
class Session(AbstractSession):
"""
A Session gets test Items from Collectors, executes the
Items and sends the Outcome to the Reporter.
"""
reporterclass = LocalReporter
def shouldclose(self):
return False
def header(self, colitems):
""" setup any neccessary resources ahead of the test run. """
self.reporter(repevent.TestStarted(None, self.config,
None))
if not self.config.option.nomagic:
py.magic.invoke(assertion=1)
def footer(self, colitems):
""" teardown any resources after a test run. """
py.test.collect.Function._state.teardown_all()
if not self.config.option.nomagic:
py.magic.revoke(assertion=1)
self.reporter(repevent.TestFinished())
def main(self, reporter=None):
""" main loop for running tests. """
config = self.config
self.reporter, shouldstop = self.init_reporter(reporter, config, None)
colitems = self.config.getcolitems()
self.header(colitems)
keyword = self.config.option.keyword
reporter = self.reporter
itemgenerator = itemgen(self, colitems, reporter, keyword)
failures = []
try:
while 1:
try:
item = itemgenerator.next()
if shouldstop():
return
outcome = self.run(item)
if outcome is not None:
if not outcome.passed and not outcome.skipped:
failures.append((item, outcome))
reporter(repevent.ReceivedItemOutcome(None, item, outcome))
except StopIteration:
break
finally:
self.footer(colitems)
return failures
return self.getitemoutcomepairs(Failed)
def run(self, item):
if not self.config.option.boxed:
executor = RunExecutor(item, self.config.option.usepdb,
self.reporter, self.config)
return ReprOutcome(executor.execute().make_repr())
else:
executor = BoxExecutor(item, self.config.option.usepdb,
self.reporter, self.config)
return ReprOutcome(executor.execute())
class Exit(Exception):
""" for immediate program exits without tracebacks and reporter/summary. """
def __init__(self, msg="unknown reason", item=None):
self.msg = msg
Exception.__init__(self, msg)
def exit(msg, item=None):
raise Exit(msg=msg, item=item)

View File

@ -135,7 +135,6 @@ def slaverun_TerminalSession(channel):
session = config.initsession() session = config.initsession()
session.shouldclose = channel.isclosed session.shouldclose = channel.isclosed
print "SLAVE: starting session.main()" print "SLAVE: starting session.main()"
session.main() failures = session.main()
failures = session.getitemoutcomepairs(Failed)
failures = [config.get_collector_trail(item) for item,_ in failures] failures = [config.get_collector_trail(item) for item,_ in failures]
channel.send(failures) channel.send(failures)

View File

@ -1,294 +0,0 @@
import py
from time import time as now
from py.__.test.terminal.out import getout
from py.__.test.representation import Presenter
from py.__.test.outcome import Skipped, Passed, Failed
import py.__.test.custompdb
def getrelpath(source, dest):
base = source.common(dest)
if not base:
return None
# with posix local paths '/' is always a common base
relsource = source.relto(base)
reldest = dest.relto(base)
n = relsource.count(source.sep)
target = dest.sep.join(('..', )*n + (reldest, ))
return target
from py.__.test.session import Session
class TerminalSession(Session):
def __init__(self, config, file=None):
super(TerminalSession, self).__init__(config)
if file is None:
file = py.std.sys.stdout
self._file = file
self.out = getout(file)
self._opencollectors = []
self.presenter = Presenter(self.out, config)
# ---------------------
# PROGRESS information
# ---------------------
def start(self, colitem):
super(TerminalSession, self).start(colitem)
if self.config.option.collectonly:
cols = self._opencollectors
self.out.line(' ' * len(cols) + repr(colitem))
cols.append(colitem)
else:
cls = getattr(colitem, '__class__', None)
if cls is None:
return
if issubclass(cls, py.test.collect.Module):
self.start_Module(colitem)
elif issubclass(cls, py.test.collect.Item):
self.start_Item(colitem)
#for typ in py.std.inspect.getmro(cls):
# meth = getattr(self, 'start_%s' % typ.__name__, None)
# if meth:
# meth(colitem)
# break
colitem.start = py.std.time.time()
def start_Module(self, colitem):
if self.config.option.verbose == 0:
abbrev_fn = getrelpath(py.path.local('.xxx.'), colitem.fspath)
self.out.write('%s' % (abbrev_fn, ))
else:
self.out.line()
self.out.line("+ testmodule: %s" % colitem.fspath)
def startiteration(self, colitem, subitems):
if (isinstance(colitem, py.test.collect.Module)
and self.config.option.verbose == 0
and not self.config.option.collectonly):
try:
sum = 0
for sub in subitems:
sum += len(list(colitem.join(sub)._tryiter()))
except (SystemExit, KeyboardInterrupt):
raise
except:
self.out.write('[?]')
else:
self.out.write('[%d] ' % sum)
return self.out.line
def start_Item(self, colitem):
if self.config.option.verbose >= 1:
if isinstance(colitem, py.test.collect.Item):
realpath, lineno = colitem._getpathlineno()
location = "%s:%d" % (realpath.basename, lineno+1)
self.out.write("%-20s %s " % (location, colitem._getmodpath()))
def finish(self, colitem, outcome):
end = now()
super(TerminalSession, self).finish(colitem, outcome)
if self.config.option.collectonly:
cols = self._opencollectors
last = cols.pop()
#assert last == colitem, "expected %r, got %r" %(last, colitem)
return
colitem.elapsedtime = end - colitem.start
if self.config.option.usepdb:
if isinstance(outcome, Failed):
print "dispatching to ppdb", colitem
self.repr_failure(colitem, outcome)
self.out.write('\n%s\n' % (outcome.excinfo.exconly(),))
py.__.test.custompdb.post_mortem(outcome.excinfo._excinfo[2])
if isinstance(colitem, py.test.collect.Module):
resultstring = self.repr_progress_module_result(colitem, outcome)
if resultstring:
self.out.line(" - " + resultstring)
if isinstance(colitem, py.test.collect.Item):
if self.config.option.verbose >= 1:
resultstring = self.repr_progress_long_result(colitem, outcome)
resultstring += " (%.2f)" % (colitem.elapsedtime,)
self.out.line(resultstring)
else:
c = self.repr_progress_short_result(colitem, outcome)
self.out.write(c)
# -------------------
# HEADER information
# -------------------
def header(self, colitems):
super(TerminalSession, self).header(colitems)
self.out.sep("=", "test process starts")
option = self.config.option
modes = []
for name in 'looponfailing', 'exitfirst', 'nomagic':
if getattr(option, name):
modes.append(name)
#if self._isremoteoption._fromremote:
# modes.insert(0, 'child process')
#else:
# modes.insert(0, 'inprocess')
#mode = "/".join(modes)
#self.out.line("testing-mode: %s" % mode)
self.out.line("executable: %s (%s)" %
(py.std.sys.executable, repr_pythonversion()))
rev = py.__package__.getrev()
self.out.line("using py lib: %s <rev %s>" % (
py.path.local(py.__file__).dirpath(), rev))
if self.config.option.traceconfig or self.config.option.verbose:
for x in colitems:
self.out.line("test target: %s" %(x.fspath,))
conftestmodules = self.config._conftest.getconftestmodules(None)
for i,x in py.builtin.enumerate(conftestmodules):
self.out.line("initial conf %d: %s" %(i, x.__file__))
#for i, x in py.builtin.enumerate(py.test.config.configpaths):
# self.out.line("initial testconfig %d: %s" %(i, x))
#additional = py.test.config.getfirst('additionalinfo')
#if additional:
# for key, descr in additional():
# self.out.line("%s: %s" %(key, descr))
self.out.line()
self.starttime = now()
# -------------------
# FOOTER information
# -------------------
def footer(self, colitems):
super(TerminalSession, self).footer(colitems)
self.endtime = now()
self.out.line()
self.skippedreasons()
self.failures()
self.summaryline()
# --------------------
# progress information
# --------------------
typemap = {
Passed: '.',
Skipped: 's',
Failed: 'F',
}
namemap = {
Passed: 'ok',
Skipped: 'SKIP',
Failed: 'FAIL',
}
def repr_progress_short_result(self, item, outcome):
for outcometype, char in self.typemap.items():
if isinstance(outcome, outcometype):
return char
else:
#raise TypeError, "not an Outomce instance: %r" % (outcome,)
return '?'
def repr_progress_long_result(self, item, outcome):
for outcometype, char in self.namemap.items():
if isinstance(outcome, outcometype):
return char
else:
#raise TypeError, "not an Outcome instance: %r" % (outcome,)
return 'UNKNOWN'
def repr_progress_module_result(self, item, outcome):
if isinstance(outcome, Failed):
return "FAILED TO LOAD MODULE"
elif isinstance(outcome, Skipped):
return "skipped"
elif not isinstance(outcome, (list, Passed)):
return "?"
# --------------------
# summary information
# --------------------
def summaryline(self):
outlist = []
sum = 0
for typ in Passed, Failed, Skipped:
l = self.getitemoutcomepairs(typ)
if l:
outlist.append('%d %s' % (len(l), typ.__name__.lower()))
sum += len(l)
elapsed = self.endtime-self.starttime
status = "%s" % ", ".join(outlist)
self.out.sep('=', 'tests finished: %s in %4.2f seconds' %
(status, elapsed))
def getlastvisible(self, sourcetraceback):
traceback = sourcetraceback[:]
while traceback:
entry = traceback.pop()
try:
x = entry.frame.eval("__tracebackhide__")
except:
x = False
if not x:
return entry
else:
return sourcetraceback[-1]
def skippedreasons(self):
texts = {}
for colitem, outcome in self.getitemoutcomepairs(Skipped):
raisingtb = self.getlastvisible(outcome.excinfo.traceback)
fn = raisingtb.frame.code.path
lineno = raisingtb.lineno
d = texts.setdefault(outcome.excinfo.exconly(), {})
d[(fn,lineno)] = outcome
if texts:
self.out.line()
self.out.sep('_', 'reasons for skipped tests')
for text, dict in texts.items():
for (fn, lineno), outcome in dict.items():
self.out.line('Skipped in %s:%d' %(fn, lineno+1))
self.out.line("reason: %s" % text)
self.out.line()
def failures(self):
if self.config.option.tbstyle == 'no':
return # skip the detailed failure reports altogether
l = self.getitemoutcomepairs(Failed)
if l:
self.out.sep('_')
for colitem, outcome in l:
self.repr_failure(colitem, outcome)
def repr_failure(self, item, outcome):
excinfo = outcome.excinfo
traceback = excinfo.traceback
#print "repr_failures sees item", item
#print "repr_failures sees traceback"
#py.std.pprint.pprint(traceback)
if item and not self.config.option.fulltrace:
path, firstlineno = item._getpathlineno()
ntraceback = traceback.cut(path=path, firstlineno=firstlineno)
if ntraceback == traceback:
ntraceback = ntraceback.cut(path=path)
traceback = ntraceback.filter()
if not traceback:
self.out.line("empty traceback from item %r" % (item,))
return
handler = getattr(self.presenter, 'repr_failure_tb%s' % self.config.option.tbstyle)
handler(item, excinfo, traceback, lambda : self.repr_out_err(item))
def repr_out_err(self, colitem):
for parent in colitem.listchain():
for name, obj in zip(['out', 'err'], parent._getouterr()):
if obj:
self.out.sep("- ", "%s: recorded std%s" % (parent.name, name))
self.out.line(obj)
def repr_pythonversion():
v = py.std.sys.version_info
try:
return "%s.%s.%s-%s-%s" % v
except ValueError:
return str(v)

View File

@ -7,8 +7,8 @@ import py, sys, os
if sys.platform == 'win32': if sys.platform == 'win32':
py.test.skip("rsession is unsupported on Windows.") py.test.skip("rsession is unsupported on Windows.")
from py.__.test.rsession.box import Box from py.__.test.box import Box
from py.__.test.rsession.testing import example2 from py.__.test.testing import example2
def setup_module(mod): def setup_module(mod):
tmpdir = py.test.ensuretemp("boxtests") tmpdir = py.test.ensuretemp("boxtests")

View File

@ -2,6 +2,13 @@ from __future__ import generators
import py import py
from setupdata import setupdatadir from setupdata import setupdatadir
from py.__.test.outcome import Skipped, Failed, Passed, Outcome from py.__.test.outcome import Skipped, Failed, Passed, Outcome
from py.__.test.terminal.out import getout
from py.__.test.repevent import ReceivedItemOutcome
def getpassed(all):
outcomes = [i.outcome for i in all if isinstance(i, ReceivedItemOutcome)]
l = [i for i in outcomes if i.passed]
return l
def setup_module(mod): def setup_module(mod):
mod.datadir = setupdatadir() mod.datadir = setupdatadir()
@ -203,20 +210,20 @@ def test_custom_python_collection_from_conftest():
old = o.chdir() old = o.chdir()
try: try:
config = py.test.config._reparse([]) config = py.test.config._reparse([])
out = py.std.cStringIO.StringIO() all = []
session = config._getsessionclass()(config, out) session = config._getsessionclass()(config)
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
assert len(l) == 2 assert len(l) == 2
finally: finally:
old.chdir() old.chdir()
# test that running the file directly works # test that running the file directly works
config = py.test.config._reparse([str(checkfile)]) config = py.test.config._reparse([str(checkfile)])
out = py.std.cStringIO.StringIO() all = []
session = config._getsessionclass()(config, out) session = config._getsessionclass()(config)
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
assert len(l) == 2 assert len(l) == 2
def test_custom_NONpython_collection_from_conftest(): def test_custom_NONpython_collection_from_conftest():
@ -250,20 +257,20 @@ def test_custom_NONpython_collection_from_conftest():
old = o.chdir() old = o.chdir()
try: try:
config = py.test.config._reparse([]) config = py.test.config._reparse([])
out = py.std.cStringIO.StringIO() all = []
session = config._getsessionclass()(config, out) session = config._getsessionclass()(config)
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
assert len(l) == 1 assert len(l) == 1
finally: finally:
old.chdir() old.chdir()
# test that running the file directly works # test that running the file directly works
config = py.test.config._reparse([str(checkfile)]) config = py.test.config._reparse([str(checkfile)])
out = py.std.cStringIO.StringIO() all = []
session = config._getsessionclass()(config, out) session = config._getsessionclass()(config)
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
assert len(l) == 1 assert len(l) == 1
def test_order_of_execution_generator_same_codeline(): def test_order_of_execution_generator_same_codeline():
@ -286,9 +293,10 @@ def test_order_of_execution_generator_same_codeline():
yield assert_order_of_execution yield assert_order_of_execution
""")) """))
config = py.test.config._reparse([o]) config = py.test.config._reparse([o])
all = []
session = config.initsession() session = config.initsession()
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
assert len(l) == 7 assert len(l) == 7
def test_order_of_execution_generator_different_codeline(): def test_order_of_execution_generator_different_codeline():
@ -318,9 +326,10 @@ def test_order_of_execution_generator_different_codeline():
yield assert_order_of_execution yield assert_order_of_execution
""")) """))
config = py.test.config._reparse([o]) config = py.test.config._reparse([o])
all = []
session = config.initsession() session = config.initsession()
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
assert len(l) == 4 assert len(l) == 4

View File

@ -0,0 +1,49 @@
import py
class TestCollectonly:
def setup_class(cls):
tmp = py.test.ensuretemp('itemgentest')
tmp.ensure("__init__.py")
tmp.ensure("test_one.py").write(py.code.Source("""
def test_one():
pass
class TestX:
def test_method_one(self):
pass
class TestY(TestX):
pass
"""))
tmp.ensure("test_two.py").write(py.code.Source("""
import py
py.test.skip('xxx')
"""))
tmp.ensure("test_three.py").write("xxxdsadsadsadsa")
cls.tmp = tmp
def test_collectonly(self):
config = py.test.config._reparse([self.tmp, '--collectonly'])
session = config.initsession()
# test it all in once
cap = py.io.StdCaptureFD()
session.main()
out, err = cap.reset()
# XXX exact output matching
lines = """<Directory 'itemgentest'>
<Module 'test_one.py'>
<Function 'test_one'>
<Class 'TestX'>
<Instance '()'>
<Function 'test_method_one'>
<Class 'TestY'>
<Instance '()'>
<Function 'test_method_one'>
<Module 'test_three.py'>
- FAILED TO LOAD MODULE -
<Module 'test_two.py'>
- skipped -
"""
for line in lines:
assert line in out

View File

@ -200,35 +200,37 @@ class TestSessionAndOptions:
def test_sessionname_default(self): def test_sessionname_default(self):
config = py.test.config._reparse([self.tmpdir]) config = py.test.config._reparse([self.tmpdir])
assert config._getsessionname() == 'TerminalSession' assert config._getsessionname() == 'Session'
def test_sessionname_dist(self): def test_sessionname_dist(self):
config = py.test.config._reparse([self.tmpdir, '--dist']) config = py.test.config._reparse([self.tmpdir, '--dist'])
assert config._getsessionname() == 'RSession' assert config._getsessionname() == 'RSession'
def test_implied_lsession(self): def test_implied_lsession(self):
optnames = 'startserver runbrowser apigen=x rest boxed'.split() #optnames = 'startserver runbrowser apigen=x rest boxed'.split()
for x in optnames: #for x in optnames:
config = py.test.config._reparse([self.tmpdir, '--%s' % x]) # config = py.test.config._reparse([self.tmpdir, '--%s' % x])
assert config._getsessionname() == 'LSession' # assert config._getsessionname() == 'LSession'
for x in 'startserver runbrowser rest'.split(): for x in 'startserver runbrowser rest'.split():
config = py.test.config._reparse([self.tmpdir, '--dist', '--%s' % x]) config = py.test.config._reparse([self.tmpdir, '--dist', '--%s' % x])
assert config._getsessionname() == 'RSession' assert config._getsessionname() == 'RSession'
def test_implied_remote_terminal_session(self): def test_implied_different_sessions(self):
config = py.test.config._reparse([self.tmpdir, '--looponfailing']) config = py.test.config._reparse([self.tmpdir, '--looponfailing'])
assert config._getsessionname() == 'RemoteTerminalSession' assert config._getsessionname() == 'RemoteTerminalSession'
config = py.test.config._reparse([self.tmpdir, '--exec=x']) config = py.test.config._reparse([self.tmpdir, '--exec=x'])
assert config._getsessionname() == 'RemoteTerminalSession' assert config._getsessionname() == 'RemoteTerminalSession'
config = py.test.config._reparse([self.tmpdir, '--dist', '--exec=x']) config = py.test.config._reparse([self.tmpdir, '--dist', '--exec=x'])
assert config._getsessionname() == 'RSession' assert config._getsessionname() == 'RSession'
config = py.test.config._reparse([self.tmpdir, '--collectonly'])
assert config._getsessionname() == 'CollectSession'
def test_sessionname_lookup_custom(self): def test_sessionname_lookup_custom(self):
self.tmpdir.join("conftest.py").write(py.code.Source(""" self.tmpdir.join("conftest.py").write(py.code.Source("""
from py.__.test.session import Session from py.__.test.session import Session
class MySession(Session): class MySession(Session):
def __init__(self, config): def __init__(self, config, reporter=None):
self.config = config self.config = config
""")) """))
config = py.test.config._reparse(["--session=MySession", self.tmpdir]) config = py.test.config._reparse(["--session=MySession", self.tmpdir])

View File

@ -2,7 +2,7 @@
import py import py
import example1 import example1
from py.__.test.rsession.executor import RunExecutor, BoxExecutor,\ from py.__.test.executor import RunExecutor, BoxExecutor,\
AsyncExecutor, ApigenExecutor AsyncExecutor, ApigenExecutor
from py.__.test.outcome import ReprOutcome from py.__.test.outcome import ReprOutcome
from py.__.test.rsession.testing.basetest import BasicRsessionTest from py.__.test.rsession.testing.basetest import BasicRsessionTest

View File

@ -0,0 +1,39 @@
import py
from py.__.test.session import itemgen
from py.__.test import repevent
class TestItemgen:
def setup_class(cls):
tmp = py.test.ensuretemp('itemgentest')
tmp.ensure("__init__.py")
tmp.ensure("test_one.py").write(py.code.Source("""
def test_one():
pass
class TestX:
def test_method_one(self):
pass
class TestY(TestX):
pass
"""))
tmp.ensure("test_two.py").write(py.code.Source("""
import py
py.test.skip('xxx')
"""))
tmp.ensure("test_three.py").write("xxxdsadsadsadsa")
cls.tmp = tmp
def test_itemgen(self):
l = []
colitems = [py.test.collect.Directory(self.tmp)]
gen = itemgen(None, colitems, l.append)
items = [i for i in gen]
assert len([i for i in l if isinstance(i, repevent.SkippedTryiter)]) == 1
assert len([i for i in l if isinstance(i, repevent.FailedTryiter)]) == 1
assert len(items) == 3
assert items[0].name == 'test_one'
assert items[1].name == 'test_method_one'
assert items[2].name == 'test_method_one'

View File

@ -3,6 +3,7 @@ import py
from py.__.test.outcome import SerializableOutcome, ReprOutcome, ExcInfoRepr from py.__.test.outcome import SerializableOutcome, ReprOutcome, ExcInfoRepr
import marshal import marshal
import py
def test_critical_debugging_flag(): def test_critical_debugging_flag():
outcome = SerializableOutcome(is_critical=True) outcome = SerializableOutcome(is_critical=True)
@ -22,13 +23,16 @@ def f2():
def f3(): def f3():
f2() f2()
def f4():
py.test.skip("argh!")
def test_exception_info_repr(): def test_exception_info_repr():
try: try:
f3() f3()
except: except:
outcome = SerializableOutcome(excinfo=py.code.ExceptionInfo()) outcome = SerializableOutcome(excinfo=py.code.ExceptionInfo())
repr = outcome.make_excinfo_repr("long") repr = outcome.make_excinfo_repr(outcome.excinfo, "long")
assert marshal.dumps(repr) assert marshal.dumps(repr)
excinfo = ExcInfoRepr(repr) excinfo = ExcInfoRepr(repr)
@ -46,5 +50,15 @@ def test_exception_info_repr():
assert excinfo.traceback[1].lineno == f3.func_code.co_firstlineno assert excinfo.traceback[1].lineno == f3.func_code.co_firstlineno
assert excinfo.traceback[1].relline == 1 assert excinfo.traceback[1].relline == 1
def test_packed_skipped():
try:
f4()
except:
outcome = SerializableOutcome(skipped=py.code.ExceptionInfo())
repr = outcome.make_excinfo_repr(outcome.skipped, "long")
assert marshal.dumps(repr)
skipped = ExcInfoRepr(repr)
assert skipped.value == "'argh!'"
#def test_f3(): #def test_f3():
# f3() # f3()

View File

@ -16,12 +16,13 @@ class TestRemote:
cls = config._getsessionclass() cls = config._getsessionclass()
out = [] # out = py.std.Queue.Queue() out = [] # out = py.std.Queue.Queue()
session = cls(config, out.append) session = cls(config, out.append)
session.main() failures = session.main()
for s in out: for s in out:
if s.find('1 failed') != -1: if s.find('1 failed') != -1:
break break
else: else:
py.test.fail("did not see test_1 failure") py.test.fail("did not see test_1 failure in output")
assert failures
def test_looponfailing(self): def test_looponfailing(self):
o = tmpdir.ensure('looponfailing', dir=1) o = tmpdir.ensure('looponfailing', dir=1)

View File

@ -42,7 +42,7 @@ def test_repevent_failures():
assert repevent.FailedTryiter(None, None).is_failure() assert repevent.FailedTryiter(None, None).is_failure()
out = ReprOutcome(SerializableOutcome().make_repr()) out = ReprOutcome(SerializableOutcome().make_repr())
assert not repevent.ReceivedItemOutcome(None, None, out).is_failure() assert not repevent.ReceivedItemOutcome(None, None, out).is_failure()
out = ReprOutcome(SerializableOutcome(skipped=True).make_repr()) out = ReprOutcome(SerializableOutcome(skipped="xxx").make_repr())
assert not repevent.ReceivedItemOutcome(None, None, out).is_failure() assert not repevent.ReceivedItemOutcome(None, None, out).is_failure()
try: try:
1/0 1/0

View File

@ -18,17 +18,26 @@ etc.
import py, os import py, os
from py.__.test.session import AbstractSession from py.__.test.session import AbstractSession, itemgen
from py.__.test.reporter import RemoteReporter, LocalReporter, choose_reporter from py.__.test.reporter import RemoteReporter, LocalReporter, choose_reporter
from py.__.test import repevent from py.__.test import repevent
from py.__.test.outcome import ReprOutcome, SerializableOutcome from py.__.test.outcome import ReprOutcome, SerializableOutcome
from py.__.test.rsession.hostmanage import HostInfo from py.__.test.rsession.hostmanage import HostInfo
from py.__.test.rsession.box import Box from py.__.test.box import Box
from py.__.test.rsession.testing.basetest import BasicRsessionTest from py.__.test.rsession.testing.basetest import BasicRsessionTest
from py.__.test.rsession.master import itemgen
import sys import sys
from StringIO import StringIO from StringIO import StringIO
class MockSession(object):
def __init__(self, reporter):
self.reporter = reporter
def start(self, item):
self.reporter(repevent.ItemStart(item))
def finish(self, item):
pass
class DummyGateway(object): class DummyGateway(object):
def __init__(self, host): def __init__(self, host):
self.host = host self.host = host
@ -45,8 +54,13 @@ class AbstractTestReporter(BasicRsessionTest):
except: except:
exc = py.code.ExceptionInfo() exc = py.code.ExceptionInfo()
try:
py.test.skip("xxx")
except:
skipexc = py.code.ExceptionInfo()
outcomes = [SerializableOutcome(()), outcomes = [SerializableOutcome(()),
SerializableOutcome(skipped=True), SerializableOutcome(skipped=skipexc),
SerializableOutcome(excinfo=exc), SerializableOutcome(excinfo=exc),
SerializableOutcome()] SerializableOutcome()]
@ -61,9 +75,12 @@ class AbstractTestReporter(BasicRsessionTest):
outcomes = self.prepare_outcomes() outcomes = self.prepare_outcomes()
def boxfun(config, item, outcomes): def boxfun(config, item, outcomes):
hosts = [HostInfo("localhost")] hosts = self.get_hosts()
r = self.reporter(config, hosts) r = self.reporter(config, hosts)
if hosts:
ch = DummyChannel(hosts[0]) ch = DummyChannel(hosts[0])
else:
ch = None
for outcome in outcomes: for outcome in outcomes:
r.report(repevent.ReceivedItemOutcome(ch, item, outcome)) r.report(repevent.ReceivedItemOutcome(ch, item, outcome))
@ -79,10 +96,13 @@ class AbstractTestReporter(BasicRsessionTest):
outcomes = self.prepare_outcomes() outcomes = self.prepare_outcomes()
def boxfun(config, item, funcitem, outcomes): def boxfun(config, item, funcitem, outcomes):
hosts = [HostInfo('localhost')] hosts = self.get_hosts()
r = self.reporter(config, hosts) r = self.reporter(config, hosts)
r.report(repevent.ItemStart(item)) r.report(repevent.ItemStart(item))
if hosts:
ch = DummyChannel(hosts[0]) ch = DummyChannel(hosts[0])
else:
ch = None
for outcome in outcomes: for outcome in outcomes:
r.report(repevent.ReceivedItemOutcome(ch, funcitem, outcome)) r.report(repevent.ReceivedItemOutcome(ch, funcitem, outcome))
@ -110,9 +130,9 @@ class AbstractTestReporter(BasicRsessionTest):
def boxfun(): def boxfun():
config = py.test.config._reparse([str(tmpdir)]) config = py.test.config._reparse([str(tmpdir)])
rootcol = py.test.collect.Directory(tmpdir) rootcol = py.test.collect.Directory(tmpdir)
hosts = [HostInfo('localhost')] hosts = self.get_hosts()
r = self.reporter(config, hosts) r = self.reporter(config, hosts)
list(itemgen([rootcol], r.report)) list(itemgen(MockSession(r), [rootcol], r.report))
cap = py.io.StdCaptureFD() cap = py.io.StdCaptureFD()
boxfun() boxfun()
@ -129,11 +149,11 @@ class AbstractTestReporter(BasicRsessionTest):
def boxfun(): def boxfun():
config = py.test.config._reparse([str(tmpdir)]) config = py.test.config._reparse([str(tmpdir)])
rootcol = py.test.collect.Directory(tmpdir) rootcol = py.test.collect.Directory(tmpdir)
host = HostInfo('localhost') hosts = self.get_hosts()
r = self.reporter(config, [host]) r = self.reporter(config, hosts)
r.report(repevent.TestStarted([host], config.topdir, ["a"])) r.report(repevent.TestStarted(hosts, config, ["a"]))
r.report(repevent.RsyncFinished()) r.report(repevent.RsyncFinished())
list(itemgen([rootcol], r.report)) list(itemgen(MockSession(r), [rootcol], r.report))
r.report(repevent.TestFinished()) r.report(repevent.TestFinished())
return r return r
@ -144,6 +164,24 @@ class AbstractTestReporter(BasicRsessionTest):
assert out.find("1 failed in") != -1 assert out.find("1 failed in") != -1
assert out.find("NameError: name 'sadsadsa' is not defined") != -1 assert out.find("NameError: name 'sadsadsa' is not defined") != -1
def _test_verbose(self):
tmpdir = py.test.ensuretemp("reporterverbose")
tmpdir.ensure("__init__.py")
tmpdir.ensure("test_one.py").write("def test_x(): pass")
cap = py.io.StdCaptureFD()
config = py.test.config._reparse([str(tmpdir), '-v'])
hosts = self.get_hosts()
r = self.reporter(config, hosts)
r.report(repevent.TestStarted(hosts, config, []))
r.report(repevent.RsyncFinished())
rootcol = py.test.collect.Directory(tmpdir)
list(itemgen(MockSession(r), [rootcol], r.report))
r.report(repevent.TestFinished())
out, err = cap.reset()
assert not err
for i in ['+ testmodule:', 'test_one.py[1]']: # XXX finish
assert i in out
def _test_still_to_go(self): def _test_still_to_go(self):
tmpdir = py.test.ensuretemp("stilltogo") tmpdir = py.test.ensuretemp("stilltogo")
tmpdir.ensure("__init__.py") tmpdir.ensure("__init__.py")
@ -153,7 +191,7 @@ class AbstractTestReporter(BasicRsessionTest):
for host in hosts: for host in hosts:
host.gw_remotepath = '' host.gw_remotepath = ''
r = self.reporter(config, hosts) r = self.reporter(config, hosts)
r.report(repevent.TestStarted(hosts, config.topdir, ["a", "b", "c"])) r.report(repevent.TestStarted(hosts, config, ["a", "b", "c"]))
for host in hosts: for host in hosts:
r.report(repevent.HostGatewayReady(host, ["a", "b", "c"])) r.report(repevent.HostGatewayReady(host, ["a", "b", "c"]))
for host in hosts: for host in hosts:
@ -174,9 +212,15 @@ class AbstractTestReporter(BasicRsessionTest):
class TestLocalReporter(AbstractTestReporter): class TestLocalReporter(AbstractTestReporter):
reporter = LocalReporter reporter = LocalReporter
def get_hosts(self):
return None
def test_report_received_item_outcome(self): def test_report_received_item_outcome(self):
assert self.report_received_item_outcome() == 'FsF.' assert self.report_received_item_outcome() == 'FsF.'
def test_verbose(self):
self._test_verbose()
def test_module(self): def test_module(self):
output = self._test_module() output = self._test_module()
assert output.find("test_one") != -1 assert output.find("test_one") != -1
@ -192,12 +236,15 @@ class TestLocalReporter(AbstractTestReporter):
class TestRemoteReporter(AbstractTestReporter): class TestRemoteReporter(AbstractTestReporter):
reporter = RemoteReporter reporter = RemoteReporter
def get_hosts(self):
return [HostInfo("host")]
def test_still_to_go(self): def test_still_to_go(self):
self._test_still_to_go() self._test_still_to_go()
def test_report_received_item_outcome(self): def test_report_received_item_outcome(self):
val = self.report_received_item_outcome() val = self.report_received_item_outcome()
expected_lst = ["localhost", "FAILED", expected_lst = ["host", "FAILED",
"funcpass", "test_one", "funcpass", "test_one",
"SKIPPED", "SKIPPED",
"PASSED"] "PASSED"]
@ -206,7 +253,7 @@ class TestRemoteReporter(AbstractTestReporter):
def test_module(self): def test_module(self):
val = self._test_module() val = self._test_module()
expected_lst = ["localhost", "FAILED", expected_lst = ["host", "FAILED",
"funcpass", "test_one", "funcpass", "test_one",
"SKIPPED", "SKIPPED",
"PASSED"] "PASSED"]
@ -222,12 +269,10 @@ def test_reporter_choice():
from py.__.test.rsession.web import WebReporter from py.__.test.rsession.web import WebReporter
from py.__.test.rsession.rest import RestReporter from py.__.test.rsession.rest import RestReporter
choices = [ choices = [
(['-d'], RemoteReporter),
(['-d', '--rest'], RestReporter), (['-d', '--rest'], RestReporter),
([], LocalReporter),
(['-w'], WebReporter), (['-w'], WebReporter),
(['-r'], WebReporter)] (['-r'], WebReporter)]
for opts, reporter in choices: for opts, reporter in choices:
config = py.test.config._reparse(['xxx'] + opts) config = py.test.config._reparse(['xxx'] + opts)
assert choose_reporter(config) is reporter assert choose_reporter(None, config) is reporter

View File

@ -56,7 +56,7 @@ def test_repr_local():
for key in locals().keys(): for key in locals().keys():
assert s.getvalue().find(key) != -1 assert s.getvalue().find(key) != -1
def test_repr_traceback_long(): def XXXtest_repr_traceback_long():
py.test.skip("unfinished") py.test.skip("unfinished")
config = py.test.config._reparse([]) config = py.test.config._reparse([])
s = StringIO() s = StringIO()

View File

@ -1,12 +1,14 @@
import py import py
from setupdata import setup_module # sets up global 'tmpdir' from setupdata import setup_module # sets up global 'tmpdir'
from py.__.test.outcome import Skipped, Failed, Passed, Outcome from py.__.test.outcome import Skipped, Failed, Passed, Outcome
from py.__.test.terminal.out import getout
from py.__.test.repevent import ReceivedItemOutcome, SkippedTryiter,\
FailedTryiter
implied_options = { implied_options = {
'--pdb': 'usepdb and nocapture',
'-v': 'verbose', '-v': 'verbose',
'-l': 'showlocals', '-l': 'showlocals',
'--runbrowser': 'startserver and runbrowser', #'--runbrowser': 'startserver and runbrowser', XXX starts browser
} }
conflict_options = ('--looponfailing --pdb', conflict_options = ('--looponfailing --pdb',
@ -14,6 +16,21 @@ conflict_options = ('--looponfailing --pdb',
'--exec=%s --pdb' % py.std.sys.executable, '--exec=%s --pdb' % py.std.sys.executable,
) )
def getoutcomes(all):
return [i.outcome for i in all if isinstance(i, ReceivedItemOutcome)]
def getpassed(all):
return [i for i in getoutcomes(all) if i.passed]
def getskipped(all):
return [i for i in getoutcomes(all) if i.skipped] + \
[i for i in all if isinstance(i, SkippedTryiter)]
def getfailed(all):
return [i for i in getoutcomes(all) if i.excinfo] + \
[i for i in all if isinstance(i, FailedTryiter)]
def test_conflict_options(): def test_conflict_options():
for spec in conflict_options: for spec in conflict_options:
opts = spec.split() opts = spec.split()
@ -43,12 +60,11 @@ def test_default_session_options():
def runfiletest(opts): def runfiletest(opts):
config = py.test.config._reparse(opts + [datadir/'filetest.py']) config = py.test.config._reparse(opts + [datadir/'filetest.py'])
all = []
session = config.initsession() session = config.initsession()
session.main() session.main(all.append)
l = session.getitemoutcomepairs(Failed) assert len(getfailed(all)) == 2
assert len(l) == 2 assert not getskipped(all)
l = session.getitemoutcomepairs(Passed)
assert not l
def test_is_not_boxed_by_default(): def test_is_not_boxed_by_default():
config = py.test.config._reparse([datadir]) config = py.test.config._reparse([datadir])
@ -59,13 +75,13 @@ class TestKeywordSelection:
def check(keyword, name): def check(keyword, name):
config = py.test.config._reparse([datadir/'filetest.py', config = py.test.config._reparse([datadir/'filetest.py',
'-s', '-k', keyword]) '-s', '-k', keyword])
session = config._getsessionclass()(config, py.std.sys.stdout) all = []
session.main() session = config._getsessionclass()(config)
l = session.getitemoutcomepairs(Failed) session.main(all.append)
assert len(l) == 1 outcomes = [i for i in all if isinstance(i, ReceivedItemOutcome)]
item = l[0][0] assert len(getfailed(all)) == 1
assert item.name == name assert outcomes[0].item.name == name
l = session.getitemoutcomepairs(Skipped) l = getskipped(all)
assert len(l) == 1 assert len(l) == 1
for keyword in ['test_one', 'est_on']: for keyword in ['test_one', 'est_on']:
@ -89,94 +105,57 @@ class TestKeywordSelection:
""")) """))
for keyword in ('xxx', 'xxx test_2', 'TestClass', 'xxx -test_1', for keyword in ('xxx', 'xxx test_2', 'TestClass', 'xxx -test_1',
'TestClass test_2', 'xxx TestClass test_2',): 'TestClass test_2', 'xxx TestClass test_2',):
f = py.std.StringIO.StringIO()
config = py.test.config._reparse([o, '-s', '-k', keyword]) config = py.test.config._reparse([o, '-s', '-k', keyword])
session = config._getsessionclass()(config, f) all = []
session.main() session = config._getsessionclass()(config)
session.main(all.append)
print "keyword", repr(keyword) print "keyword", repr(keyword)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
outcomes = [i for i in all if isinstance(i, ReceivedItemOutcome)]
assert len(l) == 1 assert len(l) == 1
assert l[0][0].name == 'test_2' assert outcomes[0].item.name == 'test_2'
l = session.getitemoutcomepairs(Skipped) l = getskipped(all)
assert l[0][0].name == 'test_1' assert l[0].item.name == 'test_1'
def test_select_starton(self): def test_select_starton(self):
config = py.test.config._reparse([datadir/'testmore.py', config = py.test.config._reparse([datadir/'testmore.py',
'-j', '-k', "test_two"]) '-j', '-k', "test_two"])
session = config._getsessionclass()(config, py.std.sys.stdout) all = []
session.main() session = config._getsessionclass()(config)
l = session.getitemoutcomepairs(Passed) session.main(all.append)
assert len(l) == 2 assert len(getpassed(all)) == 2
l = session.getitemoutcomepairs(Skipped) assert len(getskipped(all)) == 1
assert len(l) == 1
class TestTerminalSession: class TestTerminalSession:
def mainsession(self, *args): def mainsession(self, *args):
from py.__.test.terminal.terminal import TerminalSession from py.__.test.session import Session
self.file = py.std.StringIO.StringIO() from py.__.test.terminal.out import getout
config = py.test.config._reparse(list(args)) config = py.test.config._reparse(list(args))
session = TerminalSession(config, file=self.file) all = []
session.main() session = Session(config)
return session session.main(all.append)
return session, all
def test_terminal(self): def test_terminal(self):
session = self.mainsession(datadir / 'filetest.py') session, all = self.mainsession(datadir / 'filetest.py')
out = self.file.getvalue() outcomes = getoutcomes(all)
l = session.getitemoutcomepairs(Failed) assert len(getfailed(all)) == 2
assert len(l) == 2
assert out.find('2 failed') != -1
def test_syntax_error_module(self): def test_syntax_error_module(self):
session = self.mainsession(datadir / 'syntax_error.py') session, all = self.mainsession(datadir / 'syntax_error.py')
l = session.getitemoutcomepairs(Failed) l = getfailed(all)
assert len(l) == 1 assert len(l) == 1
out = self.file.getvalue() out = l[0].excinfo.exconly()
assert out.find(str('syntax_error.py')) != -1 assert out.find(str('syntax_error.py')) != -1
assert out.find(str('not python')) != -1 assert out.find(str('not python')) != -1
def test_exit_first_problem(self): def test_exit_first_problem(self):
session = self.mainsession("--exitfirst", session, all = self.mainsession("--exitfirst",
datadir / 'filetest.py') datadir / 'filetest.py')
assert session.config.option.exitfirst assert session.config.option.exitfirst
l = session.getitemoutcomepairs(Failed) assert len(getfailed(all)) == 1
assert len(l) == 1 assert not getpassed(all)
l = session.getitemoutcomepairs(Passed)
assert not l
def test_collectonly(self):
session = self.mainsession("--collectonly",
datadir / 'filetest.py')
assert session.config.option.collectonly
out = self.file.getvalue()
#print out
l = session.getitemoutcomepairs(Failed)
#if l:
# x = l[0][1].excinfo
# print x.exconly()
# print x.traceback
assert len(l) == 0
for line in ('filetest.py', 'test_one',
'TestClass', 'test_method_one'):
assert out.find(line)
def test_recursion_detection(self):
o = tmpdir.ensure('recursiontest', dir=1)
tfile = o.join('test_recursion.py')
tfile.write(py.code.Source("""
def test_1():
def f():
g()
def g():
f()
f()
"""))
session = self.mainsession(o)
print "back from main", o
out = self.file.getvalue()
#print out
i = out.find('Recursion detected')
assert i != -1
def test_generator_yields_None(self): def test_generator_yields_None(self):
o = tmpdir.ensure('generatornonetest', dir=1) o = tmpdir.ensure('generatornonetest', dir=1)
@ -185,9 +164,10 @@ class TestTerminalSession:
def test_1(): def test_1():
yield None yield None
""")) """))
session = self.mainsession(o) session, all = self.mainsession(o)
out = self.file.getvalue()
#print out #print out
failures = getfailed(all)
out = failures[0].excinfo.exconly()
i = out.find('TypeError') i = out.find('TypeError')
assert i != -1 assert i != -1
@ -213,20 +193,16 @@ class TestTerminalSession:
def finishcapture(self): def finishcapture(self):
self._testmycapture = None self._testmycapture = None
""")) """))
session = self.mainsession(o) session, all = self.mainsession(o)
l = session.getitemoutcomepairs(Passed) l = getpassed(all)
outcomes = getoutcomes(all)
assert len(l) == 1 assert len(l) == 1
item = l[0][0] item = all[3].item # item is not attached to outcome, but it's the
# started before
assert hasattr(item, '_testmycapture') assert hasattr(item, '_testmycapture')
print item._testmycapture print item._testmycapture
assert isinstance(item.parent, py.test.collect.Module) assert isinstance(item.parent, py.test.collect.Module)
out, err = item.parent._getouterr()
assert out.find('module level output') != -1
allout = self.file.getvalue()
print "allout:", allout
assert allout.find('module level output') != -1, (
"session didn't show module output")
def test_raises_output(self): def test_raises_output(self):
o = tmpdir.ensure('raisestest', dir=1) o = tmpdir.ensure('raisestest', dir=1)
@ -236,8 +212,9 @@ class TestTerminalSession:
def test_raises_doesnt(): def test_raises_doesnt():
py.test.raises(ValueError, int, "3") py.test.raises(ValueError, int, "3")
""")) """))
session = self.mainsession(o) session, all = self.mainsession(o)
out = self.file.getvalue() outcomes = getoutcomes(all)
out = outcomes[0].excinfo.exconly()
if not out.find("DID NOT RAISE") != -1: if not out.find("DID NOT RAISE") != -1:
print out print out
py.test.fail("incorrect raises() output") py.test.fail("incorrect raises() output")
@ -265,16 +242,10 @@ class TestTerminalSession:
assert self.reslist == [1,2,1,2,3] assert self.reslist == [1,2,1,2,3]
""")) """))
session = self.mainsession(o) session, all = self.mainsession(o)
l = session.getitemoutcomepairs(Failed) assert len(getfailed(all)) == 0
assert len(l) == 0 assert len(getpassed(all)) == 7
l = session.getitemoutcomepairs(Passed)
assert len(l) == 7
# also test listnames() here ... # also test listnames() here ...
item, result = l[-1]
assert item.name == 'test_4'
names = item.listnames()
assert names == ['ordertest', 'test_orderofexecution.py', 'Testmygroup', '()', 'test_4']
def test_nested_import_error(self): def test_nested_import_error(self):
o = tmpdir.ensure('Ians_importfailure', dir=1) o = tmpdir.ensure('Ians_importfailure', dir=1)
@ -288,48 +259,24 @@ class TestTerminalSession:
import does_not_work import does_not_work
a = 1 a = 1
""")) """))
session = self.mainsession(o) session, all = self.mainsession(o)
l = session.getitemoutcomepairs(Failed) l = getfailed(all)
assert len(l) == 1 assert len(l) == 1
item, outcome = l[0] out = l[0].excinfo.exconly()
assert str(outcome.excinfo).find('does_not_work') != -1 assert out.find('does_not_work') != -1
def test_safe_repr(self): def test_safe_repr(self):
session = self.mainsession(datadir/'brokenrepr.py') session, all = self.mainsession(datadir/'brokenrepr.py')
out = self.file.getvalue() #print 'Output of simulated "py.test brokenrepr.py":'
print 'Output of simulated "py.test brokenrepr.py":' #print all
print out
l = session.getitemoutcomepairs(Failed) l = getfailed(all)
assert len(l) == 2 assert len(l) == 2
out = l[0].excinfo.exconly()
assert out.find("""[Exception("Ha Ha fooled you, I'm a broken repr().") raised in repr()]""") != -1 #' assert out.find("""[Exception("Ha Ha fooled you, I'm a broken repr().") raised in repr()]""") != -1 #'
out = l[1].excinfo.exconly()
assert out.find("[unknown exception raised in repr()]") != -1 assert out.find("[unknown exception raised in repr()]") != -1
def test_E_on_correct_line(self):
o = tmpdir.ensure('E_on_correct_line', dir=1)
tfile = o.join('test_correct_line.py')
source = py.code.Source("""
import py
def test_hello():
assert (None ==
['a',
'b',
'c'])
""")
tfile.write(source)
session = self.mainsession(o)
out = self.file.getvalue()
print 'Output of simulated "py.test test_correct_line.py":'
print out
i = out.find('test_correct_line.py:')
assert i >= 0
linenum = int(out[i+len('test_correct_line.py:')]) # a single char
line_to_report = source[linenum-1]
expected_output = '\nE ' + line_to_report + '\n'
print 'Looking for:', expected_output
assert expected_output in out
def test_skip_reasons(): def test_skip_reasons():
tmp = py.test.ensuretemp("check_skip_reasons") tmp = py.test.ensuretemp("check_skip_reasons")
tmp.ensure("test_one.py").write(py.code.Source(""" tmp.ensure("test_one.py").write(py.code.Source("""
@ -342,10 +289,11 @@ def test_skip_reasons():
""")) """))
tmp.ensure("__init__.py") tmp.ensure("__init__.py")
config = py.test.config._reparse([tmp]) config = py.test.config._reparse([tmp])
all = []
session = config.initsession() session = config.initsession()
session.main() session.main(all.append)
skips = session.getitemoutcomepairs(Skipped) skips = getskipped(all)
assert len(skips) == 2 assert len(skips) == 2
assert repr(skips[0][1]) == 'Broken: stuff' assert str(skips[0].skipped.value) == 'Broken: stuff'
assert repr(skips[1][1]) == 'Not implemented: stuff' assert str(skips[1].skipped.value) == 'Not implemented: stuff'

View File

@ -3,21 +3,21 @@
""" """
import py import py
from py.__.test.rsession.rsession import LSession
from py.__.test import repevent from py.__.test import repevent
from py.__.test.rsession.local import box_runner, plain_runner, apigen_runner #from py.__.test.rsession.local import box_runner, plain_runner, apigen_runner
import py.__.test.custompdb import py.__.test.custompdb
from py.__.test.session import Session
def setup_module(mod): def setup_module(mod):
mod.tmp = py.test.ensuretemp("lsession_module") mod.tmp = py.test.ensuretemp("lsession_module")
class TestLSession(object): class TestSession(object):
# XXX: Some tests of that should be run as well on RSession, while # XXX: Some tests of that should be run as well on RSession, while
# some not at all # some not at all
def example_distribution(self, runner): def example_distribution(self, boxed=False):
# XXX find a better way for the below # XXX find a better way for the below
tmpdir = tmp tmpdir = tmp
dirname = "sub_lsession"+runner.func_name dirname = "sub_lsession"#+runner.func_name
tmpdir.ensure(dirname, "__init__.py") tmpdir.ensure(dirname, "__init__.py")
tmpdir.ensure(dirname, "test_one.py").write(py.code.Source(""" tmpdir.ensure(dirname, "test_one.py").write(py.code.Source("""
def test_1(): def test_1():
@ -33,10 +33,12 @@ class TestLSession(object):
# os.kill(os.getpid(), 11) # os.kill(os.getpid(), 11)
""")) """))
args = [str(tmpdir.join(dirname))] args = [str(tmpdir.join(dirname))]
if boxed:
args.append('--boxed')
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
lsession.main(reporter=allevents.append, runner=runner) lsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) assert len(testevents)
@ -62,13 +64,35 @@ class TestLSession(object):
assert str(tb[0].path).find("executor") != -1 assert str(tb[0].path).find("executor") != -1
assert str(tb[0].source).find("execute") != -1 assert str(tb[0].source).find("execute") != -1
def test_normal(self): def test_boxed(self):
if not hasattr(py.std.os, 'fork'): if not hasattr(py.std.os, 'fork'):
py.test.skip('operating system not supported') py.test.skip('operating system not supported')
self.example_distribution(box_runner) self.example_distribution(True)
def test_box_exploding(self):
if not hasattr(py.std.os, 'fork'):
py.test.skip('operating system not supported')
tmpdir = tmp
dirname = "boxtest"
tmpdir.ensure(dirname, "__init__.py")
tmpdir.ensure(dirname, "test_one.py").write(py.code.Source("""
def test_5():
import os
os.kill(os.getpid(), 11)
"""))
args = [str(tmpdir.join(dirname))]
args.append('--boxed')
config = py.test.config._reparse(args)
lsession = Session(config)
allevents = []
lsession.main(reporter=allevents.append)
testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents)
assert testevents[0].outcome.signal
def test_plain(self): def test_plain(self):
self.example_distribution(plain_runner) self.example_distribution(False)
def test_pdb_run(self): def test_pdb_run(self):
# we make sure that pdb is engaged # we make sure that pdb is engaged
@ -88,14 +112,14 @@ class TestLSession(object):
py.__.test.custompdb.post_mortem = some_fun py.__.test.custompdb.post_mortem = some_fun
args = [str(tmpdir.join(subdir)), '--pdb'] args = [str(tmpdir.join(subdir)), '--pdb']
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
try: #try:
lsession.main(reporter=allevents.append, runner=plain_runner) lsession.main(reporter=allevents.append)
except SystemExit: #except SystemExit:
pass # pass
else: #else:
py.test.fail("Didn't raise system exit") # py.test.fail("Didn't raise system exit")
failure_events = [event for event in allevents if isinstance(event, failure_events = [event for event in allevents if isinstance(event,
repevent.ImmediateFailure)] repevent.ImmediateFailure)]
assert len(failure_events) == 1 assert len(failure_events) == 1
@ -122,10 +146,10 @@ class TestLSession(object):
args = [str(tmpdir.join(subdir)), '-x'] args = [str(tmpdir.join(subdir)), '-x']
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
assert config.option.exitfirst assert config.option.exitfirst
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
lsession.main(reporter=allevents.append, runner=box_runner) lsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) assert len(testevents)
@ -151,10 +175,10 @@ class TestLSession(object):
""")) """))
args = [str(tmpdir.join("sub3")), '-k', 'test_one'] args = [str(tmpdir.join("sub3")), '-k', 'test_one']
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
lsession.main(reporter=allevents.append, runner=box_runner) lsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) assert len(testevents)
@ -179,22 +203,17 @@ class TestLSession(object):
args = [str(tmpdir.join("sub4"))] args = [str(tmpdir.join("sub4"))]
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
allruns = [] allruns = []
def dummy_runner(item, config, reporter): lsession.main(reporter=allevents.append)
allruns.append(item)
item.passed = True
return item
lsession.main(reporter=allevents.append, runner=dummy_runner)
assert len(allruns) == 4
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) == 4 assert len(testevents) == 4
lst = ['test_one', 'test_one_one', 'test_other', 'test_two'] lst = ['test_one', 'test_one_one', 'test_other', 'test_two']
for num, i in enumerate(testevents): for num, i in enumerate(testevents):
assert i.item == i.outcome #assert i.item == i.outcome
assert i.item.name == lst[num] assert i.item.name == lst[num]
def test_module_raising(self): def test_module_raising(self):
@ -210,9 +229,9 @@ class TestLSession(object):
args = [str(tmpdir.join("sub5"))] args = [str(tmpdir.join("sub5"))]
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
lsession.main(reporter=allevents.append, runner=box_runner) lsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) == 0 assert len(testevents) == 0
@ -236,9 +255,9 @@ class TestLSession(object):
""")) """))
args = [str(tmpdir.join("sub6"))] args = [str(tmpdir.join("sub6"))]
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
lsession.main(reporter=allevents.append, runner=box_runner) lsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
failevents = [i for i in testevents if i.outcome.excinfo] failevents = [i for i in testevents if i.outcome.excinfo]
@ -257,40 +276,12 @@ class TestLSession(object):
""")) """))
args = [str(tmpdir.join("sub7"))] args = [str(tmpdir.join("sub7"))]
config = py.test.config._reparse(args) config = py.test.config._reparse(args)
lsession = LSession(config) lsession = Session(config)
allevents = [] allevents = []
lsession.main(reporter=allevents.append, runner=plain_runner) lsession.main(reporter=allevents.append)
testevents = [x for x in allevents testevents = [x for x in allevents
if isinstance(x, repevent.ReceivedItemOutcome)] if isinstance(x, repevent.ReceivedItemOutcome)]
assert len(testevents) == 1 assert len(testevents) == 1
assert testevents[0].outcome.passed assert testevents[0].outcome.passed
assert testevents[0].outcome.stderr == "" assert testevents[0].outcome.stderr == ""
assert testevents[0].outcome.stdout == "1\n2\n3\n" assert testevents[0].outcome.stdout == "1\n2\n3\n"
def test_runner_selection(self):
tmpdir = py.test.ensuretemp("lsession_runner_selection")
tmpdir.ensure("apigen.py").write(py.code.Source("""
def get_documentable_items(*args):
return 'foo', {}
"""))
opt_mapping = {
'': plain_runner,
'--box': box_runner,
'--apigen=%s/apigen.py' % str(tmpdir): apigen_runner,
}
pkgdir = tmpdir.dirpath()
for opt in opt_mapping.keys():
if opt:
all = opt + " " + str(tmpdir)
else:
all = str(tmpdir)
config = py.test.config._reparse(all.split(" "))
lsession = LSession(config)
assert lsession.init_runner() is opt_mapping[opt]
#tmpdir.dirpath().ensure("conftest.py").write(py.code.Source("""
#dist_boxing=True
#"""))
#config = py.test.config._reparse([str(tmpdir)])
#lsession = LSession(config)
#assert lsession.init_runner(pkgdir) is box_runner
# XXX check why it fails