merge 1.0.x branch

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-07-31 15:47:08 +02:00
commit cd5a89bec9
58 changed files with 1486 additions and 509 deletions

View File

@ -12,3 +12,4 @@
c63f35c266cbb26dad6b87b5e115d65685adf448 1.0.0b8 c63f35c266cbb26dad6b87b5e115d65685adf448 1.0.0b8
c63f35c266cbb26dad6b87b5e115d65685adf448 1.0.0b8 c63f35c266cbb26dad6b87b5e115d65685adf448 1.0.0b8
0eaa0fdf2ba0163cf534dc2eff4ba2e5fc66c261 1.0.0b8 0eaa0fdf2ba0163cf534dc2eff4ba2e5fc66c261 1.0.0b8
e2a60653cb490aeed81bbbd83c070b99401c211c 1.0.0b9

View File

@ -1,3 +1,37 @@
Changes between 1.0.0b8 and 1.0.0b9
=====================================
* cleanly handle and report final teardown of test setup
* fix svn-1.6 compat issue with py.path.svnwc().versioned()
(thanks Wouter Vanden Hove)
* setup/teardown or collection problems now show as ERRORs
or with big "E"'s in the progress lines. they are reported
and counted separately.
* dist-testing: properly handle test items that get locally
collected but cannot be collected on the remote side - often
due to platform/dependency reasons
* simplified py.test.mark API - see keyword plugin documentation
* integrate better with logging: capturing now by default captures
test functions and their immediate setup/teardown in a single stream
* capsys and capfd funcargs now have a readouterr() and a close() method
(underlyingly py.io.StdCapture/FD objects are used which grew a
readouterr() method as well to return snapshots of captured out/err)
* make assert-reinterpretation work better with comparisons not
returning bools (reported with numpy from thanks maciej fijalkowski)
* reworked per-test output capturing into the pytest_iocapture.py plugin
and thus removed capturing code from config object
* item.repr_failure(excinfo) instead of item.repr_failure(excinfo, outerr)
Changes between 1.0.0b7 and 1.0.0b8 Changes between 1.0.0b7 and 1.0.0b8
===================================== =====================================

View File

@ -29,7 +29,6 @@ doc/test/extend.txt
doc/test/features.txt doc/test/features.txt
doc/test/funcargs.txt doc/test/funcargs.txt
doc/test/plugin/doctest.txt doc/test/plugin/doctest.txt
doc/test/plugin/execnetcleanup.txt
doc/test/plugin/figleaf.txt doc/test/plugin/figleaf.txt
doc/test/plugin/hooklog.txt doc/test/plugin/hooklog.txt
doc/test/plugin/hookspec.txt doc/test/plugin/hookspec.txt
@ -37,13 +36,12 @@ doc/test/plugin/index.txt
doc/test/plugin/iocapture.txt doc/test/plugin/iocapture.txt
doc/test/plugin/keyword.txt doc/test/plugin/keyword.txt
doc/test/plugin/monkeypatch.txt doc/test/plugin/monkeypatch.txt
doc/test/plugin/oejskit.txt
doc/test/plugin/pdb.txt doc/test/plugin/pdb.txt
doc/test/plugin/pocoo.txt doc/test/plugin/pocoo.txt
doc/test/plugin/pytester.txt
doc/test/plugin/recwarn.txt doc/test/plugin/recwarn.txt
doc/test/plugin/restdoc.txt doc/test/plugin/restdoc.txt
doc/test/plugin/resultlog.txt doc/test/plugin/resultlog.txt
doc/test/plugin/runner.txt
doc/test/plugin/terminal.txt doc/test/plugin/terminal.txt
doc/test/plugin/unittest.txt doc/test/plugin/unittest.txt
doc/test/plugin/xfail.txt doc/test/plugin/xfail.txt
@ -356,6 +354,7 @@ py/test/plugin/pytest_terminal.py
py/test/plugin/pytest_tmpdir.py py/test/plugin/pytest_tmpdir.py
py/test/plugin/pytest_unittest.py py/test/plugin/pytest_unittest.py
py/test/plugin/pytest_xfail.py py/test/plugin/pytest_xfail.py
py/test/plugin/test_pytest_iocapture.py
py/test/plugin/test_pytest_runner.py py/test/plugin/test_pytest_runner.py
py/test/plugin/test_pytest_runner_xunit.py py/test/plugin/test_pytest_runner_xunit.py
py/test/plugin/test_pytest_terminal.py py/test/plugin/test_pytest_terminal.py

View File

@ -237,11 +237,15 @@ class/function names of a test function are put into the set
of keywords for a given test. You can specify additional of keywords for a given test. You can specify additional
kewords like this:: kewords like this::
@py.test.mark(webtest=True) @py.test.mark.webtest
def test_send_http(): def test_send_http():
... ...
and then use those keywords to select tests. and then use those keywords to select tests. See the `pytest_keyword`_
plugin for more information.
.. _`pytest_keyword`: plugin/keyword.html
disabling a test class disabling a test class
---------------------- ----------------------

View File

@ -37,8 +37,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_doctest.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_doctest.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -32,8 +32,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_figleaf.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_figleaf.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -0,0 +1,31 @@
pytest_hooklog plugin
=====================
log invocations of extension hooks to a file.
.. contents::
:local:
command line options
--------------------
``--hooklog=HOOKLOG``
write hook calls to the given file.
Start improving this plugin in 30 seconds
=========================================
Do you find the above documentation or the plugin itself lacking?
1. Download `pytest_hooklog.py`_ plugin source code
2. put it somewhere as ``pytest_hooklog.py`` into your import path
3. a subsequent ``py.test`` run will use your local version
Further information: extend_ documentation, other plugins_ or contact_.
.. include:: links.txt

View File

@ -92,6 +92,14 @@ hook specification sourcecode
def pytest_runtest_logreport(rep): def pytest_runtest_logreport(rep):
""" process item test report. """ """ process item test report. """
# special handling for final teardown - somewhat internal for now
def pytest__teardown_final(session):
""" called before test session finishes. """
pytest__teardown_final.firstresult = True
def pytest__teardown_final_logerror(rep):
""" called if runtest_teardown_final failed. """
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# test session related hooks # test session related hooks
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
@ -163,3 +171,4 @@ hook specification sourcecode
def pytest_trace(category, msg): def pytest_trace(category, msg):
""" called for debug info. """ """ called for debug info. """
.. include:: links.txt

View File

@ -8,7 +8,7 @@ figleaf_ write and report coverage data with 'figleaf'.
monkeypatch_ safely patch object attributes, dicts and environment variables. monkeypatch_ safely patch object attributes, dicts and environment variables.
iocapture_ convenient capturing of writes to stdout/stderror streams and file descriptors. iocapture_ configurable per-test stdout/stderr capturing mechanisms.
recwarn_ helpers for asserting deprecation and other warnings. recwarn_ helpers for asserting deprecation and other warnings.
@ -35,15 +35,14 @@ resultlog_ resultlog plugin for machine-readable logging of test results.
terminal_ Implements terminal reporting of the full testing process. terminal_ Implements terminal reporting of the full testing process.
.. _`xfail`: xfail.html internal plugins / core functionality
.. _`figleaf`: figleaf.html =====================================
.. _`monkeypatch`: monkeypatch.html
.. _`iocapture`: iocapture.html pdb_ interactive debugging with the Python Debugger.
.. _`recwarn`: recwarn.html
.. _`unittest`: unittest.html keyword_ mark test functions with keywords that may hold values.
.. _`doctest`: doctest.html
.. _`oejskit`: oejskit.html hooklog_ log invocations of extension hooks to a file.
.. _`restdoc`: restdoc.html
.. _`pocoo`: pocoo.html
.. _`resultlog`: resultlog.html .. include:: links.txt
.. _`terminal`: terminal.html

View File

@ -2,34 +2,90 @@
pytest_iocapture plugin pytest_iocapture plugin
======================= =======================
convenient capturing of writes to stdout/stderror streams and file descriptors. configurable per-test stdout/stderr capturing mechanisms.
.. contents:: .. contents::
:local: :local:
Example Usage This plugin captures stdout/stderr output for each test separately.
---------------------- In case of test failures this captured output is shown grouped
togtther with the test.
You can use the `capsys funcarg`_ to capture writes The plugin also provides test function arguments that help to
to stdout and stderr streams by using it in a test assert stdout/stderr output from within your tests, see the
likes this: `funcarg example`_.
Capturing of input/output streams during tests
---------------------------------------------------
By default ``sys.stdout`` and ``sys.stderr`` are substituted with
temporary streams during the execution of tests and setup/teardown code.
During the whole testing process it will re-use the same temporary
streams allowing to play well with the logging module which easily
takes ownership on these streams.
Also, 'sys.stdin' is substituted with a file-like "null" object that
does not return any values. This is to immediately error out
on tests that wait on reading something from stdin.
You can influence output capturing mechanisms from the command line::
py.test -s # disable all capturing
py.test --capture=sys # set StringIO() to each of sys.stdout/stderr
py.test --capture=fd # capture stdout/stderr on Filedescriptors 1/2
If you set capturing values in a conftest file like this::
# conftest.py
conf_capture = 'fd'
then all tests in that directory will execute with "fd" style capturing.
sys-level capturing
------------------------------------------
Capturing on 'sys' level means that ``sys.stdout`` and ``sys.stderr``
will be replaced with StringIO() objects.
FD-level capturing and subprocesses
------------------------------------------
The ``fd`` based method means that writes going to system level files
based on the standard file descriptors will be captured, for example
writes such as ``os.write(1, 'hello')`` will be captured properly.
Capturing on fd-level will include output generated from
any subprocesses created during a test.
.. _`funcarg example`:
Example Usage of the capturing Function arguments
---------------------------------------------------
You can use the `capsys funcarg`_ and `capfd funcarg`_ to
capture writes to stdout and stderr streams. Using the
funcargs frees your test from having to care about setting/resetting
the old streams and also interacts well with py.test's own
per-test capturing. Here is an example test function:
.. sourcecode:: python .. sourcecode:: python
def test_myoutput(capsys): def test_myoutput(capsys):
print "hello" print "hello"
print >>sys.stderr, "world" print >>sys.stderr, "world"
out, err = capsys.reset() out, err = capsys.readouterr()
assert out == "hello\n" assert out == "hello\n"
assert err == "world\n" assert err == "world\n"
print "next" print "next"
out, err = capsys.reset() out, err = capsys.readouterr()
assert out == "next\n" assert out == "next\n"
The ``reset()`` call returns a tuple and will restart The ``readouterr()`` call snapshots the output so far -
capturing so that you can successively check for output. and capturing will be continued. After the test
After the test function finishes the original streams function finishes the original streams will
will be restored. be restored. If you want to capture on
the filedescriptor level you can use the ``capfd`` function
argument which offers the same interface.
.. _`capsys funcarg`: .. _`capsys funcarg`:
@ -38,8 +94,8 @@ the 'capsys' test function argument
----------------------------------- -----------------------------------
captures writes to sys.stdout/sys.stderr and makes captures writes to sys.stdout/sys.stderr and makes
them available successively via a ``capsys.reset()`` method them available successively via a ``capsys.readouterr()`` method
which returns a ``(out, err)`` tuple of captured strings. which returns a ``(out, err)`` tuple of captured snapshot strings.
.. _`capfd funcarg`: .. _`capfd funcarg`:
@ -48,8 +104,17 @@ the 'capfd' test function argument
---------------------------------- ----------------------------------
captures writes to file descriptors 1 and 2 and makes captures writes to file descriptors 1 and 2 and makes
them available successively via a ``capsys.reset()`` method snapshotted ``(out, err)`` string tuples available
which returns a ``(out, err)`` tuple of captured strings. via the ``capsys.readouterr()`` method.
command line options
--------------------
``-s``
shortcut for --capture=no.
``--capture=capture``
set IO capturing method during tests: sys|fd|no.
Start improving this plugin in 30 seconds Start improving this plugin in 30 seconds
========================================= =========================================
@ -63,8 +128,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_iocapture.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_iocapture.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -0,0 +1,46 @@
pytest_keyword plugin
=====================
mark test functions with keywords that may hold values.
.. contents::
:local:
Marking functions and setting rich attributes
----------------------------------------------------
By default, all filename parts and class/function names of a test
function are put into the set of keywords for a given test. You can
specify additional kewords like this::
@py.test.mark.webtest
def test_send_http():
...
This will set an attribute 'webtest' on the given test function
and by default all such attributes signal keywords. You can
also set values in this attribute which you could read from
a hook in order to do something special with respect to
the test function::
@py.test.mark.timeout(seconds=5)
def test_receive():
...
This will set the "timeout" attribute with a Marker object
that has a 'seconds' attribute.
Start improving this plugin in 30 seconds
=========================================
Do you find the above documentation or the plugin itself lacking?
1. Download `pytest_keyword.py`_ plugin source code
2. put it somewhere as ``pytest_keyword.py`` into your import path
3. a subsequent ``py.test`` run will use your local version
Further information: extend_ documentation, other plugins_ or contact_.
.. include:: links.txt

33
doc/test/plugin/links.txt Normal file
View File

@ -0,0 +1,33 @@
.. _`pytest_recwarn.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_recwarn.py
.. _`pytest_iocapture.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_iocapture.py
.. _`pytest_monkeypatch.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_monkeypatch.py
.. _`plugins`: index.html
.. _`pytest_doctest.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_doctest.py
.. _`terminal`: terminal.html
.. _`hooklog`: hooklog.html
.. _`pytest_restdoc.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_restdoc.py
.. _`xfail`: xfail.html
.. _`pytest_pocoo.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_pocoo.py
.. _`pytest_keyword.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_keyword.py
.. _`pytest_figleaf.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_figleaf.py
.. _`pytest_hooklog.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_hooklog.py
.. _`contact`: ../../contact.html
.. _`pocoo`: pocoo.html
.. _`checkout the py.test development version`: ../../download.html#checkout
.. _`oejskit`: oejskit.html
.. _`unittest`: unittest.html
.. _`iocapture`: iocapture.html
.. _`pytest_xfail.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_xfail.py
.. _`figleaf`: figleaf.html
.. _`extend`: ../extend.html
.. _`pytest_terminal.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_terminal.py
.. _`recwarn`: recwarn.html
.. _`pytest_pdb.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_pdb.py
.. _`monkeypatch`: monkeypatch.html
.. _`resultlog`: resultlog.html
.. _`keyword`: keyword.html
.. _`restdoc`: restdoc.html
.. _`pytest_unittest.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_unittest.py
.. _`doctest`: doctest.html
.. _`pytest_resultlog.py`: http://bitbucket.org/hpk42/py-trunk/raw/69bd12627e4d304c89c2003842703ccb10dfe838/py/test/plugin/pytest_resultlog.py
.. _`pdb`: pdb.html

View File

@ -58,8 +58,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_monkeypatch.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_monkeypatch.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

31
doc/test/plugin/pdb.txt Normal file
View File

@ -0,0 +1,31 @@
pytest_pdb plugin
=================
interactive debugging with the Python Debugger.
.. contents::
:local:
command line options
--------------------
``--pdb``
start pdb (the Python debugger) on errors.
Start improving this plugin in 30 seconds
=========================================
Do you find the above documentation or the plugin itself lacking?
1. Download `pytest_pdb.py`_ plugin source code
2. put it somewhere as ``pytest_pdb.py`` into your import path
3. a subsequent ``py.test`` run will use your local version
Further information: extend_ documentation, other plugins_ or contact_.
.. include:: links.txt

View File

@ -28,8 +28,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_pocoo.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_pocoo.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -58,8 +58,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_recwarn.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_recwarn.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -32,8 +32,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_restdoc.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_restdoc.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -28,8 +28,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_resultlog.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_resultlog.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -21,8 +21,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_terminal.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_terminal.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -31,8 +31,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_unittest.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_unittest.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -33,8 +33,4 @@ Do you find the above documentation or the plugin itself lacking?
Further information: extend_ documentation, other plugins_ or contact_. Further information: extend_ documentation, other plugins_ or contact_.
.. _`pytest_xfail.py`: http://bitbucket.org/hpk42/py-trunk/raw/85fe614ab05f301f206935d11a477df184cbbce6/py/test/plugin/pytest_xfail.py .. include:: links.txt
.. _`extend`: ../extend.html
.. _`plugins`: index.html
.. _`contact`: ../../contact.html
.. _`checkout the py.test development version`: ../../download.html#checkout

View File

@ -10,6 +10,8 @@ plugins = [
'unittest doctest oejskit restdoc'), 'unittest doctest oejskit restdoc'),
('Plugins for generic reporting and failure logging', ('Plugins for generic reporting and failure logging',
'pocoo resultlog terminal',), 'pocoo resultlog terminal',),
('internal plugins / core functionality',
'pdb keyword hooklog')
#('internal plugins / core functionality', #('internal plugins / core functionality',
# #'pdb keyword hooklog runner execnetcleanup # pytester', # #'pdb keyword hooklog runner execnetcleanup # pytester',
# 'pdb keyword hooklog runner execnetcleanup' # pytester', # 'pdb keyword hooklog runner execnetcleanup' # pytester',
@ -26,6 +28,8 @@ def warn(*args):
print >>sys.stderr, "WARN:", msg print >>sys.stderr, "WARN:", msg
class RestWriter: class RestWriter:
_all_links = {}
def __init__(self, target): def __init__(self, target):
self.target = py.path.local(target) self.target = py.path.local(target)
self.links = [] self.links = []
@ -90,9 +94,23 @@ class RestWriter:
def write_links(self): def write_links(self):
self.Print() self.Print()
self.Print(".. include:: links.txt")
for link in self.links: for link in self.links:
#warn(repr(self.link)) key = link[0]
self.Print(".. _`%s`: %s" % (link[0], link[1])) if key in self._all_links:
assert self._all_links[key] == link[1], (key, link[1])
else:
self._all_links[key] = link[1]
def write_all_links(cls, linkpath):
p = linkpath.new(basename="links.txt")
p_writer = RestWriter(p)
p_writer.out = p_writer.target.open("w")
for name, value in cls._all_links.items():
p_writer.Print(".. _`%s`: %s" % (name, value))
p_writer.out.close()
del p_writer.out
write_all_links = classmethod(write_all_links)
def make(self, **kwargs): def make(self, **kwargs):
self.out = self.target.open("w") self.out = self.target.open("w")
@ -264,3 +282,5 @@ if __name__ == "__main__":
ov = HookSpec(testdir.join("plugin", "hookspec.txt")) ov = HookSpec(testdir.join("plugin", "hookspec.txt"))
ov.make(config=_config) ov.make(config=_config)
RestWriter.write_all_links(testdir.join("plugin", "links.txt"))

View File

@ -20,7 +20,7 @@ For questions please check out http://pylib.org/contact.html
from initpkg import initpkg from initpkg import initpkg
trunk = None trunk = None
version = trunk or "1.0.0b8" version = trunk or "1.0.0b9"
initpkg(__name__, initpkg(__name__,
description = "py.test and pylib: advanced testing tool and networking lib", description = "py.test and pylib: advanced testing tool and networking lib",

View File

@ -43,3 +43,13 @@ def getsocketspec(config=None):
if spec.socket: if spec.socket:
return spec return spec
py.test.skip("need '--gx socket=...'") py.test.skip("need '--gx socket=...'")
def pytest_generate_tests(metafunc):
multi = getattr(metafunc.function, 'multi', None)
if multi is None:
return
assert len(multi.__dict__) == 1
for name, l in multi.__dict__.items():
for val in l:
metafunc.addcall(funcargs={name: val})

View File

@ -22,21 +22,52 @@ class Capture(object):
call = classmethod(call) call = classmethod(call)
def reset(self): def reset(self):
""" reset sys.stdout and sys.stderr and return captured output """ reset sys.stdout/stderr and return captured output as strings. """
as strings and restore sys.stdout/err. if hasattr(self, '_suspended'):
""" outfile = self._kwargs['out']
x, y = self.done() errfile = self._kwargs['err']
outerr = x.read(), y.read() del self._kwargs
x.close() else:
y.close() outfile, errfile = self.done()
out, err = "", ""
if outfile:
out = outfile.read()
outfile.close()
if errfile and errfile != outfile:
err = errfile.read()
errfile.close()
return out, err
def suspend(self):
""" return current snapshot captures, memorize tempfiles. """
assert not hasattr(self, '_suspended')
self._suspended = True
outerr = self.readouterr()
outfile, errfile = self.done()
self._kwargs['out'] = outfile
self._kwargs['err'] = errfile
return outerr return outerr
def resume(self):
""" resume capturing with original temp files. """
assert self._suspended
self._initialize(**self._kwargs)
del self._suspended
class StdCaptureFD(Capture): class StdCaptureFD(Capture):
""" This class allows to capture writes to FD1 and FD2 """ This class allows to capture writes to FD1 and FD2
and may connect a NULL file to FD0 (and prevent and may connect a NULL file to FD0 (and prevent
reads from sys.stdin) reads from sys.stdin)
""" """
def __init__(self, out=True, err=True, mixed=False, in_=True, patchsys=True): def __init__(self, out=True, err=True,
mixed=False, in_=True, patchsys=True):
self._kwargs = locals().copy()
del self._kwargs['self']
self._initialize(**self._kwargs)
def _initialize(self, out=True, err=True,
mixed=False, in_=True, patchsys=True):
if in_: if in_:
self._oldin = (sys.stdin, os.dup(0)) self._oldin = (sys.stdin, os.dup(0))
sys.stdin = DontReadFromInput() sys.stdin = DontReadFromInput()
@ -44,12 +75,17 @@ class StdCaptureFD(Capture):
os.dup2(fd, 0) os.dup2(fd, 0)
os.close(fd) os.close(fd)
if out: if out:
self.out = py.io.FDCapture(1) tmpfile = None
if isinstance(out, file):
tmpfile = out
self.out = py.io.FDCapture(1, tmpfile=tmpfile)
if patchsys: if patchsys:
self.out.setasfile('stdout') self.out.setasfile('stdout')
if err: if err:
if mixed and out: if mixed and out:
tmpfile = self.out.tmpfile tmpfile = self.out.tmpfile
elif isinstance(err, file):
tmpfile = err
else: else:
tmpfile = None tmpfile = None
self.err = py.io.FDCapture(2, tmpfile=tmpfile) self.err = py.io.FDCapture(2, tmpfile=tmpfile)
@ -61,11 +97,11 @@ class StdCaptureFD(Capture):
if hasattr(self, 'out'): if hasattr(self, 'out'):
outfile = self.out.done() outfile = self.out.done()
else: else:
outfile = StringIO() outfile = None
if hasattr(self, 'err'): if hasattr(self, 'err'):
errfile = self.err.done() errfile = self.err.done()
else: else:
errfile = StringIO() errfile = None
if hasattr(self, '_oldin'): if hasattr(self, '_oldin'):
oldsys, oldfd = self._oldin oldsys, oldfd = self._oldin
os.dup2(oldfd, 0) os.dup2(oldfd, 0)
@ -73,6 +109,20 @@ class StdCaptureFD(Capture):
sys.stdin = oldsys sys.stdin = oldsys
return outfile, errfile return outfile, errfile
def readouterr(self):
""" return snapshot value of stdout/stderr capturings. """
l = []
for name in ('out', 'err'):
res = ""
if hasattr(self, name):
f = getattr(self, name).tmpfile
f.seek(0)
res = f.read()
f.truncate(0)
f.seek(0)
l.append(res)
return l
class StdCapture(Capture): class StdCapture(Capture):
""" This class allows to capture writes to sys.stdout|stderr "in-memory" """ This class allows to capture writes to sys.stdout|stderr "in-memory"
and will raise errors on tries to read from sys.stdin. It only and will raise errors on tries to read from sys.stdin. It only
@ -80,21 +130,28 @@ class StdCapture(Capture):
touch underlying File Descriptors (use StdCaptureFD for that). touch underlying File Descriptors (use StdCaptureFD for that).
""" """
def __init__(self, out=True, err=True, in_=True, mixed=False): def __init__(self, out=True, err=True, in_=True, mixed=False):
self._kwargs = locals().copy()
del self._kwargs['self']
self._initialize(**self._kwargs)
def _initialize(self, out, err, in_, mixed):
self._out = out self._out = out
self._err = err self._err = err
self._in = in_ self._in = in_
if out: if out:
self.oldout = sys.stdout self._oldout = sys.stdout
sys.stdout = self.newout = StringIO() if not hasattr(out, 'write'):
out = StringIO()
sys.stdout = self.out = out
if err: if err:
self.olderr = sys.stderr self._olderr = sys.stderr
if out and mixed: if out and mixed:
newerr = self.newout err = self.out
else: elif not hasattr(err, 'write'):
newerr = StringIO() err = StringIO()
sys.stderr = self.newerr = newerr sys.stderr = self.err = err
if in_: if in_:
self.oldin = sys.stdin self._oldin = sys.stdin
sys.stdin = self.newin = DontReadFromInput() sys.stdin = self.newin = DontReadFromInput()
def done(self): def done(self):
@ -102,28 +159,39 @@ class StdCapture(Capture):
o,e = sys.stdout, sys.stderr o,e = sys.stdout, sys.stderr
if self._out: if self._out:
try: try:
sys.stdout = self.oldout sys.stdout = self._oldout
except AttributeError: except AttributeError:
raise IOError("stdout capturing already reset") raise IOError("stdout capturing already reset")
del self.oldout del self._oldout
outfile = self.newout outfile = self.out
outfile.seek(0) outfile.seek(0)
else: else:
outfile = StringIO() outfile = None
if self._err: if self._err:
try: try:
sys.stderr = self.olderr sys.stderr = self._olderr
except AttributeError: except AttributeError:
raise IOError("stderr capturing already reset") raise IOError("stderr capturing already reset")
del self.olderr del self._olderr
errfile = self.newerr errfile = self.err
errfile.seek(0) errfile.seek(0)
else: else:
errfile = StringIO() errfile = None
if self._in: if self._in:
sys.stdin = self.oldin sys.stdin = self._oldin
return outfile, errfile return outfile, errfile
def readouterr(self):
""" return snapshot value of stdout/stderr capturings. """
out = err = ""
if self._out:
out = sys.stdout.getvalue()
sys.stdout.truncate(0)
if self._err:
err = sys.stderr.getvalue()
sys.stderr.truncate(0)
return out, err
class DontReadFromInput: class DontReadFromInput:
"""Temporary stub class. Ideally when stdin is accessed, the """Temporary stub class. Ideally when stdin is accessed, the
capturing should be turned off, with possibly all data captured capturing should be turned off, with possibly all data captured

View File

@ -30,6 +30,19 @@ class TestStdCapture:
assert out == "hello world\n" assert out == "hello world\n"
assert err == "hello error\n" assert err == "hello error\n"
def test_capturing_readouterr(self):
cap = self.getcapture()
try:
print "hello world"
print >>sys.stderr, "hello error"
out, err = cap.readouterr()
assert out == "hello world\n"
assert err == "hello error\n"
print >>sys.stderr, "error2"
finally:
out, err = cap.reset()
assert err == "error2\n"
def test_capturing_mixed(self): def test_capturing_mixed(self):
cap = self.getcapture(mixed=True) cap = self.getcapture(mixed=True)
print "hello", print "hello",
@ -43,7 +56,7 @@ class TestStdCapture:
cap = self.getcapture() cap = self.getcapture()
print "hello" print "hello"
cap.reset() cap.reset()
py.test.raises(EnvironmentError, "cap.reset()") py.test.raises(Exception, "cap.reset()")
def test_capturing_modify_sysouterr_in_between(self): def test_capturing_modify_sysouterr_in_between(self):
oldout = sys.stdout oldout = sys.stdout
@ -67,7 +80,7 @@ class TestStdCapture:
cap2 = self.getcapture() cap2 = self.getcapture()
print "cap2" print "cap2"
out2, err2 = cap2.reset() out2, err2 = cap2.reset()
py.test.raises(EnvironmentError, "cap2.reset()") py.test.raises(Exception, "cap2.reset()")
out1, err1 = cap1.reset() out1, err1 = cap1.reset()
assert out1 == "cap1\n" assert out1 == "cap1\n"
assert out2 == "cap2\n" assert out2 == "cap2\n"
@ -104,6 +117,24 @@ class TestStdCapture:
py.test.raises(IOError, "sys.stdin.read()") py.test.raises(IOError, "sys.stdin.read()")
out, err = cap.reset() out, err = cap.reset()
def test_suspend_resume(self):
cap = self.getcapture(out=True, err=False, in_=False)
try:
print "hello"
sys.stderr.write("error\n")
out, err = cap.suspend()
assert out == "hello\n"
assert not err
print "in between"
sys.stderr.write("in between\n")
cap.resume()
print "after"
sys.stderr.write("error_after\n")
finally:
out, err = cap.reset()
assert out == "after\n"
assert not err
class TestStdCaptureFD(TestStdCapture): class TestStdCaptureFD(TestStdCapture):
def getcapture(self, **kw): def getcapture(self, **kw):
return py.io.StdCaptureFD(**kw) return py.io.StdCaptureFD(**kw)
@ -150,10 +181,14 @@ def test_callcapture_nofd():
os.write(1, "hello") os.write(1, "hello")
os.write(2, "hello") os.write(2, "hello")
print x print x
print >>py.std.sys.stderr, y print >>sys.stderr, y
return 42 return 42
capfd = py.io.StdCaptureFD(patchsys=False)
try:
res, out, err = py.io.StdCapture.call(func, 3, y=4) res, out, err = py.io.StdCapture.call(func, 3, y=4)
finally:
capfd.reset()
assert res == 42 assert res == 42
assert out.startswith("3") assert out.startswith("3")
assert err.startswith("4") assert err.startswith("4")

View File

@ -122,6 +122,10 @@ class Compare(Interpretable):
expr = Interpretable(self.expr) expr = Interpretable(self.expr)
expr.eval(frame) expr.eval(frame)
for operation, expr2 in self.ops: for operation, expr2 in self.ops:
if hasattr(self, 'result'):
# shortcutting in chained expressions
if not frame.is_true(self.result):
break
expr2 = Interpretable(expr2) expr2 = Interpretable(expr2)
expr2.eval(frame) expr2.eval(frame)
self.explanation = "%s %s %s" % ( self.explanation = "%s %s %s" % (
@ -135,8 +139,6 @@ class Compare(Interpretable):
raise raise
except: except:
raise Failure(self) raise Failure(self)
if not frame.is_true(self.result):
break
expr = expr2 expr = expr2
class And(Interpretable): class And(Interpretable):

View File

@ -131,3 +131,26 @@ def test_inconsistent_assert_result(testdir):
s = result.stdout.str() s = result.stdout.str()
assert s.find("re-run") != -1 assert s.find("re-run") != -1
def test_twoarg_comparison_does_not_call_nonzero():
# this arises e.g. in numpy array comparisons
class X(object):
def __eq__(self, other):
return self
def __nonzero__(self):
raise ValueError
def all(self):
return False
def f():
a = X()
b = X()
assert (a == b).all()
excinfo = getexcinfo(AssertionError, f)
msg = getmsg(excinfo)
print msg
assert "re-run" not in msg
assert "ValueError" not in msg

View File

@ -454,6 +454,8 @@ recursively. """
except py.process.cmdexec.Error, e: except py.process.cmdexec.Error, e:
if e.err.find('is not a working copy')!=-1: if e.err.find('is not a working copy')!=-1:
return False return False
if e.err.lower().find('not a versioned resource') != -1:
return False
raise raise
else: else:
return True return True

View File

@ -11,12 +11,12 @@ def test_waitfinish_removes_tempdir():
ff.waitfinish() ff.waitfinish()
assert not ff.tempdir.check() assert not ff.tempdir.check()
def test_tempdir_gets_gc_collected(): def test_tempdir_gets_gc_collected(monkeypatch):
monkeypatch.setattr(os, 'fork', lambda: os.getpid())
ff = py.process.ForkedFunc(boxf1) ff = py.process.ForkedFunc(boxf1)
assert ff.tempdir.check() assert ff.tempdir.check()
ff.__del__() ff.__del__()
assert not ff.tempdir.check() assert not ff.tempdir.check()
os.waitpid(ff.pid, 0)
def test_basic_forkedfunc(): def test_basic_forkedfunc():
result = py.process.ForkedFunc(boxf1).waitfinish() result = py.process.ForkedFunc(boxf1).waitfinish()

View File

@ -4,7 +4,6 @@ Collectors and test Items form a tree
that is usually built iteratively. that is usually built iteratively.
""" """
import py import py
from py.__.test.outcome import Skipped
def configproperty(name): def configproperty(name):
def fget(self): def fget(self):
@ -31,6 +30,10 @@ class Node(object):
self.config = getattr(parent, 'config', None) self.config = getattr(parent, 'config', None)
self.fspath = getattr(parent, 'fspath', None) self.fspath = getattr(parent, 'fspath', None)
def _checkcollectable(self):
if not hasattr(self, 'fspath'):
self.parent._memocollect() # to reraise exception
# #
# note to myself: Pickling is uh. # note to myself: Pickling is uh.
# #
@ -44,6 +47,7 @@ class Node(object):
except Exception: except Exception:
# seems our parent can't collect us # seems our parent can't collect us
# so let's be somewhat operable # so let's be somewhat operable
# _checkcollectable() is to tell outsiders about the fact
self.name = name self.name = name
self.parent = parent self.parent = parent
self.config = parent.config self.config = parent.config
@ -247,7 +251,8 @@ class Node(object):
return col._getitembynames(names) return col._getitembynames(names)
_fromtrail = staticmethod(_fromtrail) _fromtrail = staticmethod(_fromtrail)
def _repr_failure_py(self, excinfo, outerr): def _repr_failure_py(self, excinfo, outerr=None):
assert outerr is None, "XXX deprecated"
excinfo.traceback = self._prunetraceback(excinfo.traceback) excinfo.traceback = self._prunetraceback(excinfo.traceback)
# XXX temporary hack: getrepr() should not take a 'style' argument # XXX temporary hack: getrepr() should not take a 'style' argument
# at all; it should record all data in all cases, and the style # at all; it should record all data in all cases, and the style
@ -256,13 +261,9 @@ class Node(object):
style = "short" style = "short"
else: else:
style = "long" style = "long"
repr = excinfo.getrepr(funcargs=True, return excinfo.getrepr(funcargs=True,
showlocals=self.config.option.showlocals, showlocals=self.config.option.showlocals,
style=style) style=style)
for secname, content in zip(["out", "err"], outerr):
if content:
repr.addsection("Captured std%s" % secname, content.rstrip())
return repr
repr_failure = _repr_failure_py repr_failure = _repr_failure_py
shortfailurerepr = "F" shortfailurerepr = "F"
@ -291,9 +292,10 @@ class Collector(Node):
if colitem.name == name: if colitem.name == name:
return colitem return colitem
def repr_failure(self, excinfo, outerr): def repr_failure(self, excinfo, outerr=None):
""" represent a failure. """ """ represent a failure. """
return self._repr_failure_py(excinfo, outerr) assert outerr is None, "XXX deprecated"
return self._repr_failure_py(excinfo)
def _memocollect(self): def _memocollect(self):
""" internal helper method to cache results of calling collect(). """ """ internal helper method to cache results of calling collect(). """

View File

@ -240,20 +240,6 @@ class Config(object):
finally: finally:
config_per_process = py.test.config = oldconfig config_per_process = py.test.config = oldconfig
def _getcapture(self, path=None):
if self.option.nocapture:
iocapture = "no"
else:
iocapture = self.getvalue("iocapture", path=path)
if iocapture == "fd":
return py.io.StdCaptureFD()
elif iocapture == "sys":
return py.io.StdCapture()
elif iocapture == "no":
return py.io.StdCapture(out=False, err=False, in_=False)
else:
raise self.Error("unknown io capturing: " + iocapture)
def getxspecs(self): def getxspecs(self):
xspeclist = [] xspeclist = []
for xspec in self.getvalue("tx"): for xspec in self.getvalue("tx"):
@ -287,29 +273,6 @@ class Config(object):
roots.append(pydir) roots.append(pydir)
return roots return roots
def guardedcall(self, func):
excinfo = result = None
capture = self._getcapture()
try:
try:
result = func()
except KeyboardInterrupt:
raise
except:
excinfo = py.code.ExceptionInfo()
finally:
stdout, stderr = capture.reset()
return CallResult(result, excinfo, stdout, stderr)
class CallResult:
def __init__(self, result, excinfo, stdout, stderr):
self.stdout = stdout
self.stderr = stderr
self.outerr = (self.stdout, self.stderr)
self.excinfo = excinfo
if excinfo is None:
self.result = result
# #
# helpers # helpers
# #

View File

@ -10,5 +10,6 @@ Generator = py.test.collect.Generator
Function = py.test.collect.Function Function = py.test.collect.Function
Instance = py.test.collect.Instance Instance = py.test.collect.Instance
pytest_plugins = "default runner terminal keyword xfail tmpdir execnetcleanup monkeypatch recwarn pdb unittest".split() pytest_plugins = "default runner iocapture terminal keyword xfail tmpdir execnetcleanup monkeypatch recwarn pdb unittest".split()
conf_capture = "fd"

View File

@ -11,6 +11,13 @@ from py.__.test.dist.nodemanage import NodeManager
import Queue import Queue
debug_file = None # open('/tmp/loop.log', 'w')
def debug(*args):
if debug_file is not None:
s = " ".join(map(str, args))
debug_file.write(s+"\n")
debug_file.flush()
class LoopState(object): class LoopState(object):
def __init__(self, dsession, colitems): def __init__(self, dsession, colitems):
self.dsession = dsession self.dsession = dsession
@ -23,8 +30,13 @@ class LoopState(object):
self.shuttingdown = False self.shuttingdown = False
self.testsfailed = False self.testsfailed = False
def __repr__(self):
return "<LoopState exitstatus=%r shuttingdown=%r len(colitems)=%d>" % (
self.exitstatus, self.shuttingdown, len(self.colitems))
def pytest_runtest_logreport(self, rep): def pytest_runtest_logreport(self, rep):
if rep.item in self.dsession.item2nodes: if rep.item in self.dsession.item2nodes:
if rep.when != "teardown": # otherwise we have already managed it
self.dsession.removeitem(rep.item, rep.node) self.dsession.removeitem(rep.item, rep.node)
if rep.failed: if rep.failed:
self.testsfailed = True self.testsfailed = True
@ -39,8 +51,13 @@ class LoopState(object):
def pytest_testnodedown(self, node, error=None): def pytest_testnodedown(self, node, error=None):
pending = self.dsession.removenode(node) pending = self.dsession.removenode(node)
if pending: if pending:
if error:
crashitem = pending[0] crashitem = pending[0]
debug("determined crashitem", crashitem)
self.dsession.handle_crashitem(crashitem, node) self.dsession.handle_crashitem(crashitem, node)
# XXX recovery handling for "each"?
# currently pending items are not retried
if self.dsession.config.option.dist == "load":
self.colitems.extend(pending[1:]) self.colitems.extend(pending[1:])
def pytest_rescheduleitems(self, items): def pytest_rescheduleitems(self, items):
@ -115,6 +132,9 @@ class DSession(Session):
if eventname == "pytest_testnodedown": if eventname == "pytest_testnodedown":
self.config.hook.pytest_testnodedown(**kwargs) self.config.hook.pytest_testnodedown(**kwargs)
self.removenode(kwargs['node']) self.removenode(kwargs['node'])
elif eventname == "pytest_runtest_logreport":
# might be some teardown report
self.config.hook.pytest_runtest_logreport(**kwargs)
if not self.node2pending: if not self.node2pending:
# finished # finished
if loopstate.testsfailed: if loopstate.testsfailed:
@ -200,7 +220,9 @@ class DSession(Session):
node.sendlist(sending) node.sendlist(sending)
pending.extend(sending) pending.extend(sending)
for item in sending: for item in sending:
self.item2nodes.setdefault(item, []).append(node) nodes = self.item2nodes.setdefault(item, [])
assert node not in nodes
nodes.append(node)
self.config.hook.pytest_itemstart(item=item, node=node) self.config.hook.pytest_itemstart(item=item, node=node)
tosend[:] = tosend[room:] # update inplace tosend[:] = tosend[room:] # update inplace
if tosend: if tosend:
@ -237,7 +259,8 @@ class DSession(Session):
nodes.remove(node) nodes.remove(node)
if not nodes: if not nodes:
del self.item2nodes[item] del self.item2nodes[item]
self.node2pending[node].remove(item) pending = self.node2pending[node]
pending.remove(item)
def handle_crashitem(self, item, node): def handle_crashitem(self, item, node):
runner = item.config.pluginmanager.getplugin("runner") runner = item.config.pluginmanager.getplugin("runner")

View File

@ -69,6 +69,7 @@ class ImmutablePickler:
pickler = MyPickler(f, self._protocol, uneven=self.uneven) pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo pickler.memo = self._picklememo
pickler.dump(obj) pickler.dump(obj)
if obj is not None:
self._updateunpicklememo() self._updateunpicklememo()
#print >>debug, "dumped", obj #print >>debug, "dumped", obj
#print >>debug, "picklememo", self._picklememo #print >>debug, "picklememo", self._picklememo

View File

@ -7,7 +7,7 @@ XSpec = py.execnet.XSpec
def run(item, node, excinfo=None): def run(item, node, excinfo=None):
runner = item.config.pluginmanager.getplugin("runner") runner = item.config.pluginmanager.getplugin("runner")
rep = runner.ItemTestReport(item=item, rep = runner.ItemTestReport(item=item,
excinfo=excinfo, when="call", outerr=("", "")) excinfo=excinfo, when="call")
rep.node = node rep.node = node
return rep return rep
@ -155,6 +155,45 @@ class TestDSession:
dumpqueue(session.queue) dumpqueue(session.queue)
assert loopstate.exitstatus == outcome.EXIT_NOHOSTS assert loopstate.exitstatus == outcome.EXIT_NOHOSTS
def test_removeitem_from_failing_teardown(self, testdir):
# teardown reports only come in when they signal a failure
# internal session-management should basically ignore them
# XXX probably it'S best to invent a new error hook for
# teardown/setup related failures
modcol = testdir.getmodulecol("""
def test_one():
pass
def teardown_function(function):
assert 0
""")
item1, = modcol.collect()
# setup a session with two nodes
session = DSession(item1.config)
node1, node2 = MockNode(), MockNode()
session.addnode(node1)
session.addnode(node2)
# have one test pending for a node that goes down
session.senditems_each([item1])
nodes = session.item2nodes[item1]
class rep:
failed = True
item = item1
node = nodes[0]
when = "call"
session.queueevent("pytest_runtest_logreport", rep=rep)
reprec = testdir.getreportrecorder(session)
print session.item2nodes
loopstate = session._initloopstate([])
assert len(session.item2nodes[item1]) == 2
session.loop_once(loopstate)
assert len(session.item2nodes[item1]) == 1
rep.when = "teardown"
session.queueevent("pytest_runtest_logreport", rep=rep)
session.loop_once(loopstate)
assert len(session.item2nodes[item1]) == 1
def test_testnodedown_causes_reschedule_pending(self, testdir): def test_testnodedown_causes_reschedule_pending(self, testdir):
modcol = testdir.getmodulecol(""" modcol = testdir.getmodulecol("""
def test_crash(): def test_crash():
@ -173,7 +212,8 @@ class TestDSession:
# have one test pending for a node that goes down # have one test pending for a node that goes down
session.senditems_load([item1, item2]) session.senditems_load([item1, item2])
node = session.item2nodes[item1] [0] node = session.item2nodes[item1] [0]
session.queueevent("pytest_testnodedown", node=node, error=None) item1.config.option.dist = "load"
session.queueevent("pytest_testnodedown", node=node, error="xyz")
reprec = testdir.getreportrecorder(session) reprec = testdir.getreportrecorder(session)
print session.item2nodes print session.item2nodes
loopstate = session._initloopstate([]) loopstate = session._initloopstate([])
@ -367,13 +407,36 @@ class TestDSession:
assert node.gateway.spec.popen assert node.gateway.spec.popen
#XXX eq.geteventargs("pytest_sessionfinish") #XXX eq.geteventargs("pytest_sessionfinish")
@py.test.mark.xfail def test_collected_function_causes_remote_skip(testdir):
def test_collected_function_causes_remote_skip_at_module_level(self, testdir): sub = testdir.mkpydir("testing")
p = testdir.makepyfile(""" sub.join("test_module.py").write(py.code.Source("""
import py import py
py.test.importorskip("xyz") path = py.path.local(%r)
if path.check():
path.remove()
else:
py.test.skip("remote skip")
def test_func(): def test_func():
pass pass
def test_func2():
pass
""" % str(sub.ensure("somefile"))))
result = testdir.runpytest('-v', '--dist=each', '--tx=popen')
result.stdout.fnmatch_lines([
"*2 skipped*"
])
def test_teardownfails_one_function(testdir):
p = testdir.makepyfile("""
def test_func():
pass
def teardown_function(function):
assert 0
""") """)
# we need to be able to collect test_func locally but not in the subprocess result = testdir.runpytest(p, '--dist=each', '--tx=popen')
XXX result.stdout.fnmatch_lines([
"*def teardown_function(function):*",
"*1 passed*1 error*"
])

View File

@ -32,6 +32,7 @@ class EventQueue:
class MySetup: class MySetup:
def __init__(self, request): def __init__(self, request):
self.id = 0
self.request = request self.request = request
def geteventargs(self, eventname, timeout=2.0): def geteventargs(self, eventname, timeout=2.0):
@ -45,6 +46,8 @@ class MySetup:
self.queue = py.std.Queue.Queue() self.queue = py.std.Queue.Queue()
self.xspec = py.execnet.XSpec("popen") self.xspec = py.execnet.XSpec("popen")
self.gateway = py.execnet.makegateway(self.xspec) self.gateway = py.execnet.makegateway(self.xspec)
self.id += 1
self.gateway.id = str(self.id)
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put) self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed() assert not self.node.channel.isclosed()
return self.node return self.node

View File

@ -21,6 +21,11 @@ class TXNode(object):
self.channel.setcallback(self.callback, endmarker=self.ENDMARK) self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False self._down = False
def __repr__(self):
id = self.gateway.id
status = self._down and 'true' or 'false'
return "<TXNode %r down=%s>" %(id, status)
def notify(self, eventname, *args, **kwargs): def notify(self, eventname, *args, **kwargs):
assert not args assert not args
self.putevent((eventname, args, kwargs)) self.putevent((eventname, args, kwargs))
@ -115,6 +120,7 @@ class SlaveNode(object):
self.config.basetemp = py.path.local(basetemp) self.config.basetemp = py.path.local(basetemp)
self.config.pluginmanager.do_configure(self.config) self.config.pluginmanager.do_configure(self.config)
self.config.pluginmanager.register(self) self.config.pluginmanager.register(self)
self.runner = self.config.pluginmanager.getplugin("pytest_runner")
self.sendevent("slaveready") self.sendevent("slaveready")
try: try:
while 1: while 1:
@ -124,12 +130,26 @@ class SlaveNode(object):
break break
if isinstance(task, list): if isinstance(task, list):
for item in task: for item in task:
item.config.hook.pytest_runtest_protocol(item=item) self.run_single(item=item)
else: else:
task.config.hook.pytest_runtest_protocol(item=task) self.run_single(item=task)
except KeyboardInterrupt: except KeyboardInterrupt:
raise raise
except: except:
er = py.code.ExceptionInfo().getrepr(funcargs=True, showlocals=True) er = py.code.ExceptionInfo().getrepr(funcargs=True, showlocals=True)
self.sendevent("pytest_internalerror", excrepr=er) self.sendevent("pytest_internalerror", excrepr=er)
raise raise
def run_single(self, item):
call = self.runner.CallInfo(item._checkcollectable, when='setup')
if call.excinfo:
# likely it is not collectable here because of
# platform/import-dependency induced skips
# XXX somewhat ugly shortcuts - also makes a collection
# failure into an ItemTestReport - this might confuse
# pytest_runtest_logreport hooks
rep = self.runner.pytest_runtest_makereport(item=item, call=call)
self.pytest_runtest_logreport(rep)
return
item.config.hook.pytest_runtest_protocol(item=item)

View File

@ -86,6 +86,14 @@ pytest_runtest_makereport.firstresult = True
def pytest_runtest_logreport(rep): def pytest_runtest_logreport(rep):
""" process item test report. """ """ process item test report. """
# special handling for final teardown - somewhat internal for now
def pytest__teardown_final(session):
""" called before test session finishes. """
pytest__teardown_final.firstresult = True
def pytest__teardown_final_logerror(rep):
""" called if runtest_teardown_final failed. """
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# test session related hooks # test session related hooks
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------

View File

@ -60,9 +60,6 @@ def pytest_addoption(parser):
action="store", dest="tbstyle", default='long', action="store", dest="tbstyle", default='long',
type="choice", choices=['long', 'short', 'no'], type="choice", choices=['long', 'short', 'no'],
help="traceback verboseness (long/short/no).") help="traceback verboseness (long/short/no).")
group._addoption('-s',
action="store_true", dest="nocapture", default=False,
help="disable catching of stdout/stderr during test run.")
group._addoption('-p', action="append", dest="plugin", default = [], group._addoption('-p', action="append", dest="plugin", default = [],
help=("load the specified plugin after command line parsing. ")) help=("load the specified plugin after command line parsing. "))
group._addoption('-f', '--looponfail', group._addoption('-f', '--looponfail',
@ -84,9 +81,6 @@ def pytest_addoption(parser):
help="don't cut any tracebacks (default is to cut).") help="don't cut any tracebacks (default is to cut).")
group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir", group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir",
help="base temporary directory for this test run.") help="base temporary directory for this test run.")
group._addoption('--iocapture', action="store", default="fd", metavar="method",
type="choice", choices=['fd', 'sys', 'no'],
help="set iocapturing method: fd|sys|no.")
group.addoption('--debug', group.addoption('--debug',
action="store_true", dest="debug", default=False, action="store_true", dest="debug", default=False,
help="generate and show debugging information.") help="generate and show debugging information.")

View File

@ -45,7 +45,7 @@ class DoctestItem(py.test.collect.Item):
super(DoctestItem, self).__init__(name=name, parent=parent) super(DoctestItem, self).__init__(name=name, parent=parent)
self.fspath = path self.fspath = path
def repr_failure(self, excinfo, outerr): def repr_failure(self, excinfo):
if excinfo.errisinstance(py.compat.doctest.DocTestFailure): if excinfo.errisinstance(py.compat.doctest.DocTestFailure):
doctestfailure = excinfo.value doctestfailure = excinfo.value
example = doctestfailure.example example = doctestfailure.example
@ -67,9 +67,9 @@ class DoctestItem(py.test.collect.Item):
return ReprFailDoctest(reprlocation, lines) return ReprFailDoctest(reprlocation, lines)
elif excinfo.errisinstance(py.compat.doctest.UnexpectedException): elif excinfo.errisinstance(py.compat.doctest.UnexpectedException):
excinfo = py.code.ExceptionInfo(excinfo.value.exc_info) excinfo = py.code.ExceptionInfo(excinfo.value.exc_info)
return super(DoctestItem, self).repr_failure(excinfo, outerr) return super(DoctestItem, self).repr_failure(excinfo)
else: else:
return super(DoctestItem, self).repr_failure(excinfo, outerr) return super(DoctestItem, self).repr_failure(excinfo)
class DoctestTextfile(DoctestItem): class DoctestTextfile(DoctestItem):
def runtest(self): def runtest(self):

View File

@ -1,97 +1,254 @@
""" """
convenient capturing of writes to stdout/stderror streams and file descriptors. configurable per-test stdout/stderr capturing mechanisms.
Example Usage This plugin captures stdout/stderr output for each test separately.
---------------------- In case of test failures this captured output is shown grouped
togtther with the test.
You can use the `capsys funcarg`_ to capture writes The plugin also provides test function arguments that help to
to stdout and stderr streams by using it in a test assert stdout/stderr output from within your tests, see the
likes this: `funcarg example`_.
Capturing of input/output streams during tests
---------------------------------------------------
By default ``sys.stdout`` and ``sys.stderr`` are substituted with
temporary streams during the execution of tests and setup/teardown code.
During the whole testing process it will re-use the same temporary
streams allowing to play well with the logging module which easily
takes ownership on these streams.
Also, 'sys.stdin' is substituted with a file-like "null" object that
does not return any values. This is to immediately error out
on tests that wait on reading something from stdin.
You can influence output capturing mechanisms from the command line::
py.test -s # disable all capturing
py.test --capture=sys # set StringIO() to each of sys.stdout/stderr
py.test --capture=fd # capture stdout/stderr on Filedescriptors 1/2
If you set capturing values in a conftest file like this::
# conftest.py
conf_capture = 'fd'
then all tests in that directory will execute with "fd" style capturing.
sys-level capturing
------------------------------------------
Capturing on 'sys' level means that ``sys.stdout`` and ``sys.stderr``
will be replaced with StringIO() objects.
FD-level capturing and subprocesses
------------------------------------------
The ``fd`` based method means that writes going to system level files
based on the standard file descriptors will be captured, for example
writes such as ``os.write(1, 'hello')`` will be captured properly.
Capturing on fd-level will include output generated from
any subprocesses created during a test.
.. _`funcarg example`:
Example Usage of the capturing Function arguments
---------------------------------------------------
You can use the `capsys funcarg`_ and `capfd funcarg`_ to
capture writes to stdout and stderr streams. Using the
funcargs frees your test from having to care about setting/resetting
the old streams and also interacts well with py.test's own
per-test capturing. Here is an example test function:
.. sourcecode:: python .. sourcecode:: python
def test_myoutput(capsys): def test_myoutput(capsys):
print "hello" print "hello"
print >>sys.stderr, "world" print >>sys.stderr, "world"
out, err = capsys.reset() out, err = capsys.readouterr()
assert out == "hello\\n" assert out == "hello\\n"
assert err == "world\\n" assert err == "world\\n"
print "next" print "next"
out, err = capsys.reset() out, err = capsys.readouterr()
assert out == "next\\n" assert out == "next\\n"
The ``reset()`` call returns a tuple and will restart The ``readouterr()`` call snapshots the output so far -
capturing so that you can successively check for output. and capturing will be continued. After the test
After the test function finishes the original streams function finishes the original streams will
will be restored. be restored. If you want to capture on
the filedescriptor level you can use the ``capfd`` function
argument which offers the same interface.
""" """
import py import py
def pytest_addoption(parser):
group = parser.getgroup("general")
group._addoption('-s', action="store_const", const="no", dest="capture",
help="shortcut for --capture=no.")
group._addoption('--capture', action="store", default=None,
metavar="capture", type="choice", choices=['fd', 'sys', 'no'],
help="set IO capturing method during tests: sys|fd|no.")
def addouterr(rep, outerr):
repr = getattr(rep, 'longrepr', None)
if not hasattr(repr, 'addsection'):
return
for secname, content in zip(["out", "err"], outerr):
if content:
repr.addsection("Captured std%s" % secname, content.rstrip())
def pytest_configure(config):
config.pluginmanager.register(CaptureManager(), 'capturemanager')
class CaptureManager:
def __init__(self):
self._method2capture = {}
def _startcapture(self, method):
if method == "fd":
return py.io.StdCaptureFD()
elif method == "sys":
return py.io.StdCapture()
else:
raise ValueError("unknown capturing method: %r" % method)
def _getmethod(self, config, fspath):
if config.option.capture:
return config.option.capture
return config._conftest.rget("conf_capture", path=fspath)
def resumecapture_item(self, item):
method = self._getmethod(item.config, item.fspath)
if not hasattr(item, 'outerr'):
item.outerr = ('', '') # we accumulate outerr on the item
return self.resumecapture(method)
def resumecapture(self, method):
if hasattr(self, '_capturing'):
raise ValueError("cannot resume, already capturing with %r" %
(self._capturing,))
if method != "no":
cap = self._method2capture.get(method)
if cap is None:
cap = self._startcapture(method)
self._method2capture[method] = cap
else:
cap.resume()
self._capturing = method
def suspendcapture(self):
self.deactivate_funcargs()
method = self._capturing
if method != "no":
cap = self._method2capture[method]
outerr = cap.suspend()
else:
outerr = "", ""
del self._capturing
return outerr
def activate_funcargs(self, pyfuncitem):
if not hasattr(pyfuncitem, 'funcargs'):
return
assert not hasattr(self, '_capturing_funcargs')
l = []
for name, obj in pyfuncitem.funcargs.items():
if name in ('capsys', 'capfd'):
obj._start()
l.append(obj)
if l:
self._capturing_funcargs = l
def deactivate_funcargs(self):
if hasattr(self, '_capturing_funcargs'):
for capfuncarg in self._capturing_funcargs:
capfuncarg._finalize()
del self._capturing_funcargs
def pytest_make_collect_report(self, __call__, collector):
method = self._getmethod(collector.config, collector.fspath)
self.resumecapture(method)
try:
rep = __call__.execute(firstresult=True)
finally:
outerr = self.suspendcapture()
addouterr(rep, outerr)
return rep
def pytest_runtest_setup(self, item):
self.resumecapture_item(item)
def pytest_runtest_call(self, item):
self.resumecapture_item(item)
self.activate_funcargs(item)
def pytest_runtest_teardown(self, item):
self.resumecapture_item(item)
def pytest_runtest_teardown(self, item):
self.resumecapture_item(item)
def pytest__teardown_final(self, __call__, session):
method = self._getmethod(session.config, None)
self.resumecapture(method)
try:
rep = __call__.execute(firstresult=True)
finally:
outerr = self.suspendcapture()
if rep:
addouterr(rep, outerr)
return rep
def pytest_keyboard_interrupt(self, excinfo):
if hasattr(self, '_capturing'):
self.suspendcapture()
def pytest_runtest_makereport(self, __call__, item, call):
self.deactivate_funcargs()
rep = __call__.execute(firstresult=True)
outerr = self.suspendcapture()
outerr = (item.outerr[0] + outerr[0], item.outerr[1] + outerr[1])
if not rep.passed:
addouterr(rep, outerr)
if not rep.passed or rep.when == "teardown":
outerr = ('', '')
item.outerr = outerr
return rep
def pytest_funcarg__capsys(request): def pytest_funcarg__capsys(request):
"""captures writes to sys.stdout/sys.stderr and makes """captures writes to sys.stdout/sys.stderr and makes
them available successively via a ``capsys.reset()`` method them available successively via a ``capsys.readouterr()`` method
which returns a ``(out, err)`` tuple of captured strings. which returns a ``(out, err)`` tuple of captured snapshot strings.
""" """
capture = Capture(py.io.StdCapture) return CaptureFuncarg(request, py.io.StdCapture)
request.addfinalizer(capture.finalize)
return capture
def pytest_funcarg__capfd(request): def pytest_funcarg__capfd(request):
"""captures writes to file descriptors 1 and 2 and makes """captures writes to file descriptors 1 and 2 and makes
them available successively via a ``capsys.reset()`` method snapshotted ``(out, err)`` string tuples available
which returns a ``(out, err)`` tuple of captured strings. via the ``capsys.readouterr()`` method.
""" """
capture = Capture(py.io.StdCaptureFD) return CaptureFuncarg(request, py.io.StdCaptureFD)
request.addfinalizer(capture.finalize)
return capture
def pytest_pyfunc_call(pyfuncitem):
if hasattr(pyfuncitem, 'funcargs'):
for funcarg, value in pyfuncitem.funcargs.items():
if funcarg == "capsys" or funcarg == "capfd":
value.reset()
class Capture: class CaptureFuncarg:
_capture = None def __init__(self, request, captureclass):
def __init__(self, captureclass): self._cclass = captureclass
self._captureclass = captureclass #request.addfinalizer(self._finalize)
def finalize(self): def _start(self):
if self._capture: self.capture = self._cclass()
self._capture.reset()
def reset(self): def _finalize(self):
res = None if hasattr(self, 'capture'):
if self._capture: self.capture.reset()
res = self._capture.reset() del self.capture
self._capture = self._captureclass()
return res
class TestCapture: def readouterr(self):
def test_std_functional(self, testdir): return self.capture.readouterr()
reprec = testdir.inline_runsource("""
def test_hello(capsys):
print 42
out, err = capsys.reset()
assert out.startswith("42")
""")
reprec.assertoutcome(passed=1)
def test_stdfd_functional(self, testdir):
reprec = testdir.inline_runsource("""
def test_hello(capfd):
import os
os.write(1, "42")
out, err = capfd.reset()
assert out.startswith("42")
""")
reprec.assertoutcome(passed=1)
def test_funcall_yielded_no_funcargs(self, testdir):
reprec = testdir.inline_runsource("""
def test_hello():
yield lambda: None
""")
reprec.assertoutcome(passed=1)
def close(self):
self.capture.reset()
del self.capture

View File

@ -1,70 +1,86 @@
""" """
py.test.mark / keyword plugin mark test functions with keywords that may hold values.
Marking functions and setting rich attributes
----------------------------------------------------
By default, all filename parts and class/function names of a test
function are put into the set of keywords for a given test. You can
specify additional kewords like this::
@py.test.mark.webtest
def test_send_http():
...
This will set an attribute 'webtest' on the given test function
and by default all such attributes signal keywords. You can
also set values in this attribute which you could read from
a hook in order to do something special with respect to
the test function::
@py.test.mark.timeout(seconds=5)
def test_receive():
...
This will set the "timeout" attribute with a Marker object
that has a 'seconds' attribute.
""" """
import py import py
def pytest_namespace(): def pytest_namespace():
mark = KeywordDecorator({}) return {'mark': Mark()}
return {'mark': mark}
class KeywordDecorator:
""" decorator for setting function attributes. """
def __init__(self, keywords, lastname=None):
self._keywords = keywords
self._lastname = lastname
def __call__(self, func=None, **kwargs):
if func is None:
kw = self._keywords.copy()
kw.update(kwargs)
return KeywordDecorator(kw)
elif not hasattr(func, 'func_dict'):
kw = self._keywords.copy()
name = self._lastname
if name is None:
name = "mark"
kw[name] = func
return KeywordDecorator(kw)
func.func_dict.update(self._keywords)
return func
class Mark(object):
def __getattr__(self, name): def __getattr__(self, name):
if name[0] == "_": if name[0] == "_":
raise AttributeError(name) raise AttributeError(name)
kw = self._keywords.copy() return MarkerDecorator(name)
kw[name] = True
return self.__class__(kw, lastname=name) class MarkerDecorator:
""" decorator for setting function attributes. """
def __init__(self, name):
self.markname = name
def __repr__(self):
d = self.__dict__.copy()
name = d.pop('markname')
return "<MarkerDecorator %r %r>" %(name, d)
def __call__(self, *args, **kwargs):
if not args:
if hasattr(self, 'kwargs'):
raise TypeError("double mark-keywords?")
self.kwargs = kwargs.copy()
return self
else:
if not len(args) == 1 or not hasattr(args[0], 'func_dict'):
raise TypeError("need exactly one function to decorate, "
"got %r" %(args,))
func = args[0]
mh = MarkHolder(getattr(self, 'kwargs', {}))
setattr(func, self.markname, mh)
return func
class MarkHolder:
def __init__(self, kwargs):
self.__dict__.update(kwargs)
def test_pytest_mark_api():
mark = Mark()
py.test.raises(TypeError, "mark(x=3)")
def test_pytest_mark_getattr():
mark = KeywordDecorator({})
def f(): pass def f(): pass
mark.hello(f) mark.hello(f)
assert f.hello == True assert f.hello
mark.hello("test")(f) mark.world(x=3, y=4)(f)
assert f.hello == "test" assert f.world
assert f.world.x == 3
assert f.world.y == 4
py.test.raises(AttributeError, "mark._hello") py.test.raises(TypeError, "mark.some(x=3)(f=5)")
py.test.raises(AttributeError, "mark.__str__")
def test_pytest_mark_call():
mark = KeywordDecorator({})
def f(): pass
mark(x=3)(f)
assert f.x == 3
def g(): pass
mark(g)
assert not g.func_dict
mark.hello(f)
assert f.hello == True
mark.hello("test")(f)
assert f.hello == "test"
mark("x1")(f)
assert f.mark == "x1"
def test_mark_plugin(testdir): def test_mark_plugin(testdir):
p = testdir.makepyfile(""" p = testdir.makepyfile("""

View File

@ -23,11 +23,18 @@ def pytest_configure(config):
class PdbInvoke: class PdbInvoke:
def pytest_runtest_makereport(self, item, call): def pytest_runtest_makereport(self, item, call):
if call.excinfo and not call.excinfo.errisinstance(Skipped): if call.excinfo and not call.excinfo.errisinstance(Skipped):
# XXX hack hack hack to play well with capturing
capman = item.config.pluginmanager.impname2plugin['capturemanager']
capman.suspendcapture()
tw = py.io.TerminalWriter() tw = py.io.TerminalWriter()
repr = call.excinfo.getrepr() repr = call.excinfo.getrepr()
repr.toterminal(tw) repr.toterminal(tw)
post_mortem(call.excinfo._excinfo[2]) post_mortem(call.excinfo._excinfo[2])
# XXX hack end
capman.resumecapture_item(item)
class Pdb(py.std.pdb.Pdb): class Pdb(py.std.pdb.Pdb):
def do_list(self, arg): def do_list(self, arg):
self.lastcmd = 'list' self.lastcmd = 'list'

View File

@ -130,6 +130,11 @@ class TmpTestdir:
def mkdir(self, name): def mkdir(self, name):
return self.tmpdir.mkdir(name) return self.tmpdir.mkdir(name)
def mkpydir(self, name):
p = self.mkdir(name)
p.ensure("__init__.py")
return p
def genitems(self, colitems): def genitems(self, colitems):
return list(self.session.genitems(colitems)) return list(self.session.genitems(colitems))
@ -296,6 +301,9 @@ class TmpTestdir:
assert script.check() assert script.check()
return py.std.sys.executable, script return py.std.sys.executable, script
def runpython(self, script):
return self.run(py.std.sys.executable, script)
def runpytest(self, *args): def runpytest(self, *args):
p = py.path.local.make_numbered_dir(prefix="runpytest-", p = py.path.local.make_numbered_dir(prefix="runpytest-",
keep=None, rootdir=self.tmpdir) keep=None, rootdir=self.tmpdir)
@ -515,7 +523,6 @@ def test_testdir_runs_with_plugin(testdir):
def pytest_funcarg__venv(request): def pytest_funcarg__venv(request):
p = request.config.mktemp(request.function.__name__, numbered=True) p = request.config.mktemp(request.function.__name__, numbered=True)
venv = VirtualEnv(str(p)) venv = VirtualEnv(str(p))
venv.create()
return venv return venv
def pytest_funcarg__py_setup(request): def pytest_funcarg__py_setup(request):
@ -528,13 +535,14 @@ def pytest_funcarg__py_setup(request):
class SetupBuilder: class SetupBuilder:
def __init__(self, setup_path): def __init__(self, setup_path):
self.setup_path = setup_path self.setup_path = setup_path
assert setup_path.check()
def make_sdist(self, destdir=None): def make_sdist(self, destdir=None):
temp = py.path.local.mkdtemp() temp = py.path.local.mkdtemp()
try: try:
args = ['python', str(self.setup_path), 'sdist', args = ['python', str(self.setup_path), 'sdist',
'--dist-dir', str(temp)] '--dist-dir', str(temp)]
subprocess.check_call(args) subcall(args)
l = temp.listdir('py-*') l = temp.listdir('py-*')
assert len(l) == 1 assert len(l) == 1
sdist = l[0] sdist = l[0]
@ -549,6 +557,11 @@ class SetupBuilder:
finally: finally:
temp.remove() temp.remove()
def subcall(args):
if hasattr(subprocess, 'check_call'):
subprocess.check_call(args)
else:
subprocess.call(args)
# code taken from Ronny Pfannenschmidt's virtualenvmanager # code taken from Ronny Pfannenschmidt's virtualenvmanager
class VirtualEnv(object): class VirtualEnv(object):
@ -562,22 +575,22 @@ class VirtualEnv(object):
def _cmd(self, name): def _cmd(self, name):
return os.path.join(self.path, 'bin', name) return os.path.join(self.path, 'bin', name)
@property def ensure(self):
def valid(self): if not os.path.exists(self._cmd('python')):
return os.path.exists(self._cmd('python')) self.create()
def create(self, sitepackages=False): def create(self, sitepackages=False):
args = ['virtualenv', self.path] args = ['virtualenv', self.path]
if not sitepackages: if not sitepackages:
args.append('--no-site-packages') args.append('--no-site-packages')
subprocess.check_call(args) subcall(args)
def makegateway(self): def makegateway(self):
python = self._cmd('python') python = self._cmd('python')
return py.execnet.makegateway("popen//python=%s" %(python,)) return py.execnet.makegateway("popen//python=%s" %(python,))
def pcall(self, cmd, *args, **kw): def pcall(self, cmd, *args, **kw):
assert self.valid self.ensure()
return subprocess.call([ return subprocess.call([
self._cmd(cmd) self._cmd(cmd)
] + list(args), ] + list(args),

View File

@ -105,12 +105,14 @@ class ReSTSyntaxTest(py.test.collect.Item):
def register_pygments(self): def register_pygments(self):
# taken from pygments-main/external/rst-directive.py # taken from pygments-main/external/rst-directive.py
from docutils.parsers.rst import directives
try: try:
from pygments.formatters import HtmlFormatter from pygments.formatters import HtmlFormatter
except ImportError: except ImportError:
def pygments_directive(name, arguments, options, content, lineno, def pygments_directive(name, arguments, options, content, lineno,
content_offset, block_text, state, state_machine): content_offset, block_text, state, state_machine):
return [] return []
pygments_directive.options = {}
else: else:
# The default formatter # The default formatter
DEFAULT = HtmlFormatter(noclasses=True) DEFAULT = HtmlFormatter(noclasses=True)
@ -120,7 +122,6 @@ class ReSTSyntaxTest(py.test.collect.Item):
} }
from docutils import nodes from docutils import nodes
from docutils.parsers.rst import directives
from pygments import highlight from pygments import highlight
from pygments.lexers import get_lexer_by_name, TextLexer from pygments.lexers import get_lexer_by_name, TextLexer
@ -137,10 +138,10 @@ class ReSTSyntaxTest(py.test.collect.Item):
parsed = highlight(u'\n'.join(content), lexer, formatter) parsed = highlight(u'\n'.join(content), lexer, formatter)
return [nodes.raw('', parsed, format='html')] return [nodes.raw('', parsed, format='html')]
pygments_directive.arguments = (1, 0, 1)
pygments_directive.content = 1
pygments_directive.options = dict([(key, directives.flag) for key in VARIANTS]) pygments_directive.options = dict([(key, directives.flag) for key in VARIANTS])
pygments_directive.arguments = (1, 0, 1)
pygments_directive.content = 1
directives.register_directive('sourcecode', pygments_directive) directives.register_directive('sourcecode', pygments_directive)
def resolve_linkrole(self, name, text, check=True): def resolve_linkrole(self, name, text, check=True):

View File

@ -20,26 +20,21 @@ def pytest_configure(config):
config._setupstate = SetupState() config._setupstate = SetupState()
def pytest_sessionfinish(session, exitstatus): def pytest_sessionfinish(session, exitstatus):
# XXX see above
if hasattr(session.config, '_setupstate'): if hasattr(session.config, '_setupstate'):
session.config._setupstate.teardown_all() hook = session.config.hook
# prevent logging module atexit handler from choking on rep = hook.pytest__teardown_final(session=session)
# its attempt to close already closed streams if rep:
# see http://bugs.python.org/issue6333 hook.pytest__teardown_final_logerror(rep=rep)
mod = py.std.sys.modules.get("logging", None)
if mod is not None:
mod.raiseExceptions = False
def pytest_make_collect_report(collector): def pytest_make_collect_report(collector):
call = collector.config.guardedcall( result = excinfo = None
lambda: collector._memocollect() try:
) result = collector._memocollect()
result = None except KeyboardInterrupt:
if not call.excinfo: raise
result = call.result except:
return CollectReport(collector, result, call.excinfo, call.outerr) excinfo = py.code.ExceptionInfo()
return CollectReport(collector, result, excinfo)
return report
def pytest_runtest_protocol(item): def pytest_runtest_protocol(item):
if item.config.getvalue("boxed"): if item.config.getvalue("boxed"):
@ -66,40 +61,52 @@ def pytest_runtest_call(item):
item.runtest() item.runtest()
def pytest_runtest_makereport(item, call): def pytest_runtest_makereport(item, call):
return ItemTestReport(item, call.excinfo, call.when, call.outerr) return ItemTestReport(item, call.excinfo, call.when)
def pytest_runtest_teardown(item): def pytest_runtest_teardown(item):
item.config._setupstate.teardown_exact(item) item.config._setupstate.teardown_exact(item)
def pytest__teardown_final(session):
call = CallInfo(session.config._setupstate.teardown_all, when="teardown")
if call.excinfo:
rep = TeardownErrorReport(call.excinfo)
return rep
def pytest_report_teststatus(rep):
if rep.when in ("setup", "teardown"):
if rep.failed:
# category, shortletter, verbose-word
return "error", "E", "ERROR"
elif rep.skipped:
return "skipped", "s", "SKIPPED"
else:
return "", "", ""
# #
# Implementation # Implementation
def call_and_report(item, when, log=True): def call_and_report(item, when, log=True):
call = RuntestHookCall(item, when) call = call_runtest_hook(item, when)
hook = item.config.hook hook = item.config.hook
report = hook.pytest_runtest_makereport(item=item, call=call) report = hook.pytest_runtest_makereport(item=item, call=call)
if log and (when == "call" or not report.passed): if log and (when == "call" or not report.passed):
hook.pytest_runtest_logreport(rep=report) hook.pytest_runtest_logreport(rep=report)
return report return report
def call_runtest_hook(item, when):
class RuntestHookCall: hookname = "pytest_runtest_" + when
excinfo = None
_prefix = "pytest_runtest_"
def __init__(self, item, when):
self.when = when
hookname = self._prefix + when
hook = getattr(item.config.hook, hookname) hook = getattr(item.config.hook, hookname)
capture = item.config._getcapture() return CallInfo(lambda: hook(item=item), when=when)
class CallInfo:
excinfo = None
def __init__(self, func, when):
self.when = when
try: try:
try: self.result = func()
self.result = hook(item=item)
except KeyboardInterrupt: except KeyboardInterrupt:
raise raise
except: except:
self.excinfo = py.code.ExceptionInfo() self.excinfo = py.code.ExceptionInfo()
finally:
self.outerr = capture.reset()
def forked_run_report(item): def forked_run_report(item):
# for now, we run setup/teardown in the subprocess # for now, we run setup/teardown in the subprocess
@ -149,10 +156,9 @@ class BaseReport(object):
class ItemTestReport(BaseReport): class ItemTestReport(BaseReport):
failed = passed = skipped = False failed = passed = skipped = False
def __init__(self, item, excinfo=None, when=None, outerr=None): def __init__(self, item, excinfo=None, when=None):
self.item = item self.item = item
self.when = when self.when = when
self.outerr = outerr
if item and when != "setup": if item and when != "setup":
self.keywords = item.readkeywords() self.keywords = item.readkeywords()
else: else:
@ -173,32 +179,42 @@ class ItemTestReport(BaseReport):
elif excinfo.errisinstance(Skipped): elif excinfo.errisinstance(Skipped):
self.skipped = True self.skipped = True
shortrepr = "s" shortrepr = "s"
longrepr = self.item._repr_failure_py(excinfo, outerr) longrepr = self.item._repr_failure_py(excinfo)
else: else:
self.failed = True self.failed = True
shortrepr = self.item.shortfailurerepr shortrepr = self.item.shortfailurerepr
if self.when == "call": if self.when == "call":
longrepr = self.item.repr_failure(excinfo, outerr) longrepr = self.item.repr_failure(excinfo)
else: # exception in setup or teardown else: # exception in setup or teardown
longrepr = self.item._repr_failure_py(excinfo, outerr) longrepr = self.item._repr_failure_py(excinfo)
shortrepr = shortrepr.lower() shortrepr = shortrepr.lower()
self.shortrepr = shortrepr self.shortrepr = shortrepr
self.longrepr = longrepr self.longrepr = longrepr
def __repr__(self):
status = (self.passed and "passed" or
self.skipped and "skipped" or
self.failed and "failed" or
"CORRUPT")
l = [repr(self.item.name), "when=%r" % self.when, "outcome %r" % status,]
if hasattr(self, 'node'):
l.append("txnode=%s" % self.node.gateway.id)
info = " " .join(map(str, l))
return "<ItemTestReport %s>" % info
def getnode(self): def getnode(self):
return self.item return self.item
class CollectReport(BaseReport): class CollectReport(BaseReport):
skipped = failed = passed = False skipped = failed = passed = False
def __init__(self, collector, result, excinfo=None, outerr=None): def __init__(self, collector, result, excinfo=None):
self.collector = collector self.collector = collector
if not excinfo: if not excinfo:
self.passed = True self.passed = True
self.result = result self.result = result
else: else:
self.outerr = outerr self.longrepr = self.collector._repr_failure_py(excinfo)
self.longrepr = self.collector._repr_failure_py(excinfo, outerr)
if excinfo.errisinstance(Skipped): if excinfo.errisinstance(Skipped):
self.skipped = True self.skipped = True
self.reason = str(excinfo.value) self.reason = str(excinfo.value)
@ -208,6 +224,13 @@ class CollectReport(BaseReport):
def getnode(self): def getnode(self):
return self.collector return self.collector
class TeardownErrorReport(BaseReport):
skipped = passed = False
failed = True
when = "teardown"
def __init__(self, excinfo):
self.longrepr = excinfo.getrepr(funcargs=True)
class SetupState(object): class SetupState(object):
""" shared state for setting up/tearing down test items or collectors. """ """ shared state for setting up/tearing down test items or collectors. """
def __init__(self): def __init__(self):

View File

@ -166,11 +166,14 @@ class TerminalReporter:
fspath, lineno, msg = self._getreportinfo(item) fspath, lineno, msg = self._getreportinfo(item)
self.write_fspath_result(fspath, "") self.write_fspath_result(fspath, "")
def pytest__teardown_final_logerror(self, rep):
self.stats.setdefault("error", []).append(rep)
def pytest_runtest_logreport(self, rep): def pytest_runtest_logreport(self, rep):
if rep.passed and rep.when in ("setup", "teardown"):
return
fspath = rep.item.fspath
cat, letter, word = self.getcategoryletterword(rep) cat, letter, word = self.getcategoryletterword(rep)
if not letter and not word:
# probably passed setup/teardown
return
if isinstance(word, tuple): if isinstance(word, tuple):
word, markup = word word, markup = word
else: else:
@ -194,9 +197,9 @@ class TerminalReporter:
def pytest_collectreport(self, rep): def pytest_collectreport(self, rep):
if not rep.passed: if not rep.passed:
if rep.failed: if rep.failed:
self.stats.setdefault("failed", []).append(rep) self.stats.setdefault("error", []).append(rep)
msg = rep.longrepr.reprcrash.message msg = rep.longrepr.reprcrash.message
self.write_fspath_result(rep.collector.fspath, "F") self.write_fspath_result(rep.collector.fspath, "E")
elif rep.skipped: elif rep.skipped:
self.stats.setdefault("skipped", []).append(rep) self.stats.setdefault("skipped", []).append(rep)
self.write_fspath_result(rep.collector.fspath, "S") self.write_fspath_result(rep.collector.fspath, "S")
@ -237,6 +240,7 @@ class TerminalReporter:
__call__.execute() __call__.execute()
self._tw.line("") self._tw.line("")
if exitstatus in (0, 1, 2): if exitstatus in (0, 1, 2):
self.summary_errors()
self.summary_failures() self.summary_failures()
self.summary_skips() self.summary_skips()
self.config.hook.pytest_terminal_summary(terminalreporter=self) self.config.hook.pytest_terminal_summary(terminalreporter=self)
@ -288,9 +292,11 @@ class TerminalReporter:
def _getfailureheadline(self, rep): def _getfailureheadline(self, rep):
if hasattr(rep, "collector"): if hasattr(rep, "collector"):
return str(rep.collector.fspath) return str(rep.collector.fspath)
else: elif hasattr(rep, 'item'):
fspath, lineno, msg = self._getreportinfo(rep.item) fspath, lineno, msg = self._getreportinfo(rep.item)
return msg return msg
else:
return "test session"
def _getreportinfo(self, item): def _getreportinfo(self, item):
try: try:
@ -312,16 +318,39 @@ class TerminalReporter:
for rep in self.stats['failed']: for rep in self.stats['failed']:
msg = self._getfailureheadline(rep) msg = self._getfailureheadline(rep)
self.write_sep("_", msg) self.write_sep("_", msg)
self.write_platinfo(rep)
rep.toterminal(self._tw)
def summary_errors(self):
if 'error' in self.stats and self.config.option.tbstyle != "no":
self.write_sep("=", "ERRORS")
for rep in self.stats['error']:
msg = self._getfailureheadline(rep)
if not hasattr(rep, 'when'):
# collect
msg = "ERROR during collection " + msg
elif rep.when == "setup":
msg = "ERROR at setup of " + msg
elif rep.when == "teardown":
msg = "ERROR at teardown of " + msg
self.write_sep("_", msg)
self.write_platinfo(rep)
rep.toterminal(self._tw)
def write_platinfo(self, rep):
if hasattr(rep, 'node'): if hasattr(rep, 'node'):
self.write_line(self.gateway2info.get( self.write_line(self.gateway2info.get(
rep.node.gateway, "node %r (platinfo not found? strange)") rep.node.gateway,
"node %r (platinfo not found? strange)")
[:self._tw.fullwidth-1]) [:self._tw.fullwidth-1])
rep.toterminal(self._tw)
def summary_stats(self): def summary_stats(self):
session_duration = py.std.time.time() - self._sessionstarttime session_duration = py.std.time.time() - self._sessionstarttime
keys = "failed passed skipped deselected".split() keys = "failed passed skipped deselected".split()
for key in self.stats.keys():
if key not in keys:
keys.append(key)
parts = [] parts = []
for key in keys: for key in keys:
val = self.stats.get(key, None) val = self.stats.get(key, None)

View File

@ -0,0 +1,307 @@
import py, os, sys
from py.__.test.plugin.pytest_iocapture import CaptureManager
class TestCaptureManager:
def test_configure_per_fspath(self, testdir):
config = testdir.parseconfig(testdir.tmpdir)
assert config.getvalue("capture") is None
capman = CaptureManager()
assert capman._getmethod(config, None) == "fd" # default
for name in ('no', 'fd', 'sys'):
sub = testdir.tmpdir.mkdir("dir" + name)
sub.ensure("__init__.py")
sub.join("conftest.py").write('conf_capture = %r' % name)
assert capman._getmethod(config, sub.join("test_hello.py")) == name
@py.test.mark.multi(method=['no', 'fd', 'sys'])
def test_capturing_basic_api(self, method):
capouter = py.io.StdCaptureFD()
old = sys.stdout, sys.stderr, sys.stdin
try:
capman = CaptureManager()
capman.resumecapture(method)
print "hello"
out, err = capman.suspendcapture()
if method == "no":
assert old == (sys.stdout, sys.stderr, sys.stdin)
else:
assert out == "hello\n"
capman.resumecapture(method)
out, err = capman.suspendcapture()
assert not out and not err
finally:
capouter.reset()
def test_juggle_capturings(self, testdir):
capouter = py.io.StdCaptureFD()
try:
config = testdir.parseconfig(testdir.tmpdir)
capman = CaptureManager()
capman.resumecapture("fd")
py.test.raises(ValueError, 'capman.resumecapture("fd")')
py.test.raises(ValueError, 'capman.resumecapture("sys")')
os.write(1, "hello\n")
out, err = capman.suspendcapture()
assert out == "hello\n"
capman.resumecapture("sys")
os.write(1, "hello\n")
print >>sys.stderr, "world"
out, err = capman.suspendcapture()
assert not out
assert err == "world\n"
finally:
capouter.reset()
def test_collect_capturing(testdir):
p = testdir.makepyfile("""
print "collect %s failure" % 13
import xyz42123
""")
result = testdir.runpytest(p)
result.stdout.fnmatch_lines([
"*Captured stdout*",
"*collect 13 failure*",
])
class TestPerTestCapturing:
def test_capture_and_fixtures(self, testdir):
p = testdir.makepyfile("""
def setup_module(mod):
print "setup module"
def setup_function(function):
print "setup", function.__name__
def test_func1():
print "in func1"
assert 0
def test_func2():
print "in func2"
assert 0
""")
result = testdir.runpytest(p)
result.stdout.fnmatch_lines([
"setup module*",
"setup test_func1*",
"in func1*",
"setup test_func2*",
"in func2*",
])
def test_no_carry_over(self, testdir):
p = testdir.makepyfile("""
def test_func1():
print "in func1"
def test_func2():
print "in func2"
assert 0
""")
result = testdir.runpytest(p)
s = result.stdout.str()
assert "in func1" not in s
assert "in func2" in s
def test_teardown_capturing(self, testdir):
p = testdir.makepyfile("""
def setup_function(function):
print "setup func1"
def teardown_function(function):
print "teardown func1"
assert 0
def test_func1():
print "in func1"
pass
""")
result = testdir.runpytest(p)
assert result.stdout.fnmatch_lines([
'*teardown_function*',
'*Captured stdout*',
"setup func1*",
"in func1*",
"teardown func1*",
#"*1 fixture failure*"
])
def test_teardown_final_capturing(self, testdir):
p = testdir.makepyfile("""
def teardown_module(mod):
print "teardown module"
assert 0
def test_func():
pass
""")
result = testdir.runpytest(p)
assert result.stdout.fnmatch_lines([
"*def teardown_module(mod):*",
"*Captured stdout*",
"*teardown module*",
"*1 error*",
])
def test_capturing_outerr(self, testdir):
p1 = testdir.makepyfile("""
import sys
def test_capturing():
print 42
print >>sys.stderr, 23
def test_capturing_error():
print 1
print >>sys.stderr, 2
raise ValueError
""")
result = testdir.runpytest(p1)
result.stdout.fnmatch_lines([
"*test_capturing_outerr.py .F",
"====* FAILURES *====",
"____*____",
"*test_capturing_outerr.py:8: ValueError",
"*--- Captured stdout ---*",
"1",
"*--- Captured stderr ---*",
"2",
])
class TestLoggingInteraction:
def test_logging_stream_ownership(self, testdir):
p = testdir.makepyfile("""
def test_logging():
import logging
import StringIO
stream = StringIO.StringIO()
logging.basicConfig(stream=stream)
stream.close() # to free memory/release resources
""")
result = testdir.runpytest(p)
result.stderr.str().find("atexit") == -1
def test_capturing_and_logging_fundamentals(self, testdir):
# here we check a fundamental feature
rootdir = str(py.path.local(py.__file__).dirpath().dirpath())
p = testdir.makepyfile("""
import sys
sys.path.insert(0, %r)
import py, logging
cap = py.io.StdCaptureFD(out=False, in_=False)
logging.warn("hello1")
outerr = cap.suspend()
print "suspeneded and captured", outerr
logging.warn("hello2")
cap.resume()
logging.warn("hello3")
outerr = cap.suspend()
print "suspend2 and captured", outerr
""" % rootdir)
result = testdir.runpython(p)
assert result.stdout.fnmatch_lines([
"suspeneded and captured*hello1*",
"suspend2 and captured*hello2*WARNING:root:hello3*",
])
assert "atexit" not in result.stderr.str()
def test_logging_and_immediate_setupteardown(self, testdir):
p = testdir.makepyfile("""
import logging
def setup_function(function):
logging.warn("hello1")
def test_logging():
logging.warn("hello2")
assert 0
def teardown_function(function):
logging.warn("hello3")
assert 0
""")
for optargs in (('--capture=sys',), ('--capture=fd',)):
print optargs
result = testdir.runpytest(p, *optargs)
s = result.stdout.str()
result.stdout.fnmatch_lines([
"*WARN*hello3", # errors show first!
"*WARN*hello1",
"*WARN*hello2",
])
# verify proper termination
assert "closed" not in s
@py.test.mark.xfail
def test_logging_and_crossscope_fixtures(self, testdir):
# XXX also needs final teardown reporting to work!
p = testdir.makepyfile("""
import logging
def setup_module(function):
logging.warn("hello1")
def test_logging():
logging.warn("hello2")
assert 0
def teardown_module(function):
logging.warn("hello3")
assert 0
""")
for optargs in (('--iocapture=sys',), ('--iocapture=fd',)):
print optargs
result = testdir.runpytest(p, *optargs)
s = result.stdout.str()
result.stdout.fnmatch_lines([
"*WARN*hello1",
"*WARN*hello2",
"*WARN*hello3",
])
# verify proper termination
assert "closed" not in s
class TestCaptureFuncarg:
def test_std_functional(self, testdir):
reprec = testdir.inline_runsource("""
def test_hello(capsys):
print 42
out, err = capsys.readouterr()
assert out.startswith("42")
""")
reprec.assertoutcome(passed=1)
def test_stdfd_functional(self, testdir):
reprec = testdir.inline_runsource("""
def test_hello(capfd):
import os
os.write(1, "42")
out, err = capfd.readouterr()
assert out.startswith("42")
capfd.close()
""")
reprec.assertoutcome(passed=1)
def test_partial_setup_failure(self, testdir):
p = testdir.makepyfile("""
def test_hello(capfd, missingarg):
pass
""")
result = testdir.runpytest(p)
assert result.stdout.fnmatch_lines([
"*test_partial_setup_failure*",
"*1 error*",
])
def test_keyboardinterrupt_disables_capturing(self, testdir):
p = testdir.makepyfile("""
def test_hello(capfd):
import os
os.write(1, "42")
raise KeyboardInterrupt()
""")
result = testdir.runpytest(p)
result.stdout.fnmatch_lines([
"*KEYBOARD INTERRUPT*"
])
assert result.ret == 2

View File

@ -126,7 +126,7 @@ class BaseFunctionalTests:
testdir.makepyfile(conftest=""" testdir.makepyfile(conftest="""
import py import py
class Function(py.test.collect.Function): class Function(py.test.collect.Function):
def repr_failure(self, excinfo, outerr): def repr_failure(self, excinfo):
return "hello" return "hello"
""") """)
reports = testdir.runitem(""" reports = testdir.runitem("""
@ -143,7 +143,7 @@ class BaseFunctionalTests:
#assert rep.failed.where.path.basename == "test_func.py" #assert rep.failed.where.path.basename == "test_func.py"
#assert rep.failed.failurerepr == "hello" #assert rep.failed.failurerepr == "hello"
def test_failure_in_setup_function_ignores_custom_failure_repr(self, testdir): def test_failure_in_setup_function_ignores_custom_repr(self, testdir):
testdir.makepyfile(conftest=""" testdir.makepyfile(conftest="""
import py import py
class Function(py.test.collect.Function): class Function(py.test.collect.Function):
@ -168,21 +168,6 @@ class BaseFunctionalTests:
#assert rep.outcome.where.path.basename == "test_func.py" #assert rep.outcome.where.path.basename == "test_func.py"
#assert instanace(rep.failed.failurerepr, PythonFailureRepr) #assert instanace(rep.failed.failurerepr, PythonFailureRepr)
def test_capture_in_func(self, testdir):
reports = testdir.runitem("""
import sys
def setup_function(func):
print "in setup"
def test_func():
print "in function"
assert 0
def teardown_function(func):
print "in teardown"
""")
assert reports[0].outerr[0] == "in setup\n"
assert reports[1].outerr[0] == "in function\n"
assert reports[2].outerr[0] == "in teardown\n"
def test_systemexit_does_not_bail_out(self, testdir): def test_systemexit_does_not_bail_out(self, testdir):
try: try:
reports = testdir.runitem(""" reports = testdir.runitem("""
@ -208,6 +193,23 @@ class BaseFunctionalTests:
else: else:
py.test.fail("did not raise") py.test.fail("did not raise")
@py.test.mark.xfail
def test_capture_per_func(self, testdir):
reports = testdir.runitem("""
import sys
def setup_function(func):
print "in setup"
def test_func():
print "in function"
assert 0
def teardown_function(func):
print "in teardown"
""")
assert reports[0].outerr[0] == "in setup\n"
assert reports[1].outerr[0] == "in function\n"
assert reports[2].outerr[0] == "in teardown\n"
class TestExecutionNonForked(BaseFunctionalTests): class TestExecutionNonForked(BaseFunctionalTests):
def getrunner(self): def getrunner(self):
@ -287,16 +289,3 @@ def test_functional_boxed(testdir):
"*1 failed*" "*1 failed*"
]) ])
def test_logging_interaction(testdir):
p = testdir.makepyfile("""
def test_logging():
import logging
import StringIO
stream = StringIO.StringIO()
logging.basicConfig(stream=stream)
stream.close() # to free memory/release resources
""")
result = testdir.runpytest(p)
assert result.stderr.str().find("atexit") == -1

View File

@ -89,9 +89,10 @@ class TestTerminal:
p = testdir.makepyfile("import xyz") p = testdir.makepyfile("import xyz")
result = testdir.runpytest(*option._getcmdargs()) result = testdir.runpytest(*option._getcmdargs())
result.stdout.fnmatch_lines([ result.stdout.fnmatch_lines([
"*test_collect_fail.py F*", "*test_collect_fail.py E*",
"> import xyz", "> import xyz",
"E ImportError: No module named xyz", "E ImportError: No module named xyz",
"*1 error*",
]) ])
def test_internalerror(self, testdir, linecomp): def test_internalerror(self, testdir, linecomp):
@ -357,3 +358,62 @@ def test_repr_python_version(monkeypatch):
py.std.sys.version_info = x = (2,3) py.std.sys.version_info = x = (2,3)
assert repr_pythonversion() == str(x) assert repr_pythonversion() == str(x)
class TestFixtureReporting:
def test_setup_fixture_error(self, testdir):
p = testdir.makepyfile("""
def setup_function(function):
print "setup func"
assert 0
def test_nada():
pass
""")
result = testdir.runpytest()
result.stdout.fnmatch_lines([
"*ERROR at setup of test_nada*",
"*setup_function(function):*",
"*setup func*",
"*assert 0*",
"*1 error*",
])
assert result.ret != 0
def test_teardown_fixture_error(self, testdir):
p = testdir.makepyfile("""
def test_nada():
pass
def teardown_function(function):
print "teardown func"
assert 0
""")
result = testdir.runpytest()
result.stdout.fnmatch_lines([
"*ERROR at teardown*",
"*teardown_function(function):*",
"*assert 0*",
"*Captured stdout*",
"*teardown func*",
"*1 passed*1 error*",
])
def test_teardown_fixture_error_and_test_failure(self, testdir):
p = testdir.makepyfile("""
def test_fail():
assert 0, "failingfunc"
def teardown_function(function):
print "teardown func"
assert False
""")
result = testdir.runpytest()
result.stdout.fnmatch_lines([
"*ERROR at teardown of test_fail*",
"*teardown_function(function):*",
"*assert False*",
"*Captured stdout*",
"*teardown func*",
"*test_fail*",
"*def test_fail():",
"*failingfunc*",
"*1 failed*1 error*",
])

View File

@ -271,8 +271,9 @@ class FunctionMixin(PyobjMixin):
traceback = ntraceback.filter() traceback = ntraceback.filter()
return traceback return traceback
def repr_failure(self, excinfo, outerr): def repr_failure(self, excinfo, outerr=None):
return self._repr_failure_py(excinfo, outerr) assert outerr is None, "XXX outerr usage is deprecated"
return self._repr_failure_py(excinfo)
shortfailurerepr = "F" shortfailurerepr = "F"

View File

@ -227,28 +227,6 @@ class TestGeneralUsage:
"*test_traceback_failure.py:4: AssertionError" "*test_traceback_failure.py:4: AssertionError"
]) ])
def test_capturing_outerr(self, testdir):
p1 = testdir.makepyfile("""
import sys
def test_capturing():
print 42
print >>sys.stderr, 23
def test_capturing_error():
print 1
print >>sys.stderr, 2
raise ValueError
""")
result = testdir.runpytest(p1)
result.stdout.fnmatch_lines([
"*test_capturing_outerr.py .F",
"====* FAILURES *====",
"____*____",
"*test_capturing_outerr.py:8: ValueError",
"*--- Captured stdout ---*",
"1",
"*--- Captured stderr ---*",
"2",
])
def test_showlocals(self, testdir): def test_showlocals(self, testdir):
p1 = testdir.makepyfile(""" p1 = testdir.makepyfile("""

View File

@ -212,38 +212,6 @@ class TestConfigApi_getcolitems:
for col in col.listchain(): for col in col.listchain():
assert col.config is config assert col.config is config
class TestGuardedCall:
def test_guardedcall_ok(self, testdir):
config = testdir.parseconfig()
def myfunc(x):
print x
print >>py.std.sys.stderr, "hello"
return 7
call = config.guardedcall(lambda: myfunc(3))
assert call.excinfo is None
assert call.result == 7
assert call.stdout.startswith("3")
assert call.stderr.startswith("hello")
def test_guardedcall_fail(self, testdir):
config = testdir.parseconfig()
def myfunc(x):
print x
raise ValueError(17)
call = config.guardedcall(lambda: myfunc(3))
assert call.excinfo
assert call.excinfo.type == ValueError
assert not hasattr(call, 'result')
assert call.stdout.startswith("3")
assert not call.stderr
def test_guardedcall_keyboardinterrupt(self, testdir):
config = testdir.parseconfig()
def myfunc():
raise KeyboardInterrupt
py.test.raises(KeyboardInterrupt, config.guardedcall, myfunc)
class TestOptionEffects: class TestOptionEffects:
def test_boxed_option_default(self, testdir): def test_boxed_option_default(self, testdir):
tmpdir = testdir.tmpdir.ensure("subdir", dir=1) tmpdir = testdir.tmpdir.ensure("subdir", dir=1)
@ -258,29 +226,6 @@ class TestOptionEffects:
config = py.test.config._reparse([testdir.tmpdir]) config = py.test.config._reparse([testdir.tmpdir])
assert not config.option.boxed assert not config.option.boxed
def test_config_iocapturing(self, testdir):
config = testdir.parseconfig(testdir.tmpdir)
assert config.getvalue("iocapture")
tmpdir = testdir.tmpdir.ensure("sub-with-conftest", dir=1)
tmpdir.join("conftest.py").write(py.code.Source("""
pytest_option_iocapture = "no"
"""))
config = py.test.config._reparse([tmpdir])
assert config.getvalue("iocapture") == "no"
capture = config._getcapture()
assert isinstance(capture, py.io.StdCapture)
assert not capture._out
assert not capture._err
assert not capture._in
assert isinstance(capture, py.io.StdCapture)
for opt, cls in (("sys", py.io.StdCapture),
("fd", py.io.StdCaptureFD),
):
config.option.iocapture = opt
capture = config._getcapture()
assert isinstance(capture, cls)
class TestConfig_gettopdir: class TestConfig_gettopdir:
def test_gettopdir(self, testdir): def test_gettopdir(self, testdir):
from py.__.test.config import gettopdir from py.__.test.config import gettopdir

View File

@ -194,7 +194,7 @@ class TestRequest:
""") """)
result = testdir.runpytest(p) result = testdir.runpytest(p)
assert result.stdout.fnmatch_lines([ assert result.stdout.fnmatch_lines([
"*1 failed*1 passed*" "*1 passed*1 error*"
]) ])
def test_request_getmodulepath(self, testdir): def test_request_getmodulepath(self, testdir):

View File

@ -1,9 +1,16 @@
import py import py
def test_make_sdist_and_run_it(py_setup, venv): def test_make_sdist_and_run_it(capfd, py_setup, venv):
try:
sdist = py_setup.make_sdist(venv.path) sdist = py_setup.make_sdist(venv.path)
venv.easy_install(str(sdist)) venv.easy_install(str(sdist))
gw = venv.makegateway() gw = venv.makegateway()
ch = gw.remote_exec("import py ; channel.send(py.__version__)") ch = gw.remote_exec("import py ; channel.send(py.__version__)")
version = ch.receive() version = ch.receive()
assert version == py.__version__ assert version == py.__version__
except KeyboardInterrupt:
raise
except:
print capfd.readouterr()
raise
capfd.close()

View File

@ -1,13 +1,11 @@
import py import py
from py.__.test import parseopt from py.__.test import parseopt
pytest_plugins = 'pytest_iocapture'
class TestParser: class TestParser:
def test_init(self, capsys): def test_init(self, capsys):
parser = parseopt.Parser(usage="xyz") parser = parseopt.Parser(usage="xyz")
py.test.raises(SystemExit, 'parser.parse(["-h"])') py.test.raises(SystemExit, 'parser.parse(["-h"])')
out, err = capsys.reset() out, err = capsys.readouterr()
assert out.find("xyz") != -1 assert out.find("xyz") != -1
def test_group_add_and_get(self): def test_group_add_and_get(self):

View File

@ -31,7 +31,7 @@ def main():
name='py', name='py',
description='py.test and pylib: advanced testing tool and networking lib', description='py.test and pylib: advanced testing tool and networking lib',
long_description = long_description, long_description = long_description,
version= trunk or '1.0.0b8', version= trunk or '1.0.0b9',
url='http://pylib.org', url='http://pylib.org',
license='MIT license', license='MIT license',
platforms=['unix', 'linux', 'osx', 'cygwin', 'win32'], platforms=['unix', 'linux', 'osx', 'cygwin', 'win32'],