test_ok1/_pytest/pytester.py

621 lines
21 KiB
Python
Raw Normal View History

""" (disabled by default) support for testing pytest and pytest plugins. """
2014-08-01 06:13:40 +08:00
import sys
import os
import codecs
import re
import time
2014-08-01 06:13:40 +08:00
import platform
from fnmatch import fnmatch
2014-08-01 06:13:40 +08:00
import subprocess
import py
import pytest
from py.builtin import print_
from _pytest.core import HookCaller, add_method_wrapper
2014-08-01 06:13:40 +08:00
from _pytest.main import Session, EXIT_OK
def get_public_names(l):
"""Only return names from iterator l without a leading underscore."""
return [x for x in l if x[0] != "_"]
def pytest_addoption(parser):
group = parser.getgroup("pylib")
group.addoption('--no-tools-on-path',
action="store_true", dest="notoolsonpath", default=False,
help=("discover tools on PATH instead of going through py.cmdline.")
)
def pytest_configure(config):
# This might be called multiple times. Only take the first.
global _pytest_fullpath
try:
_pytest_fullpath
except NameError:
_pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
_pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
class ParsedCall:
2014-10-04 21:49:31 +08:00
def __init__(self, name, kwargs):
self.__dict__.update(kwargs)
self._name = name
def __repr__(self):
d = self.__dict__.copy()
del d['_name']
return "<ParsedCall %r(**%r)>" %(self._name, d)
2014-10-04 21:49:31 +08:00
class HookRecorder:
def __init__(self, pluginmanager):
self._pluginmanager = pluginmanager
self.calls = []
2014-10-06 19:37:57 +08:00
2014-10-04 21:49:31 +08:00
def _docall(hookcaller, methods, kwargs):
self.calls.append(ParsedCall(hookcaller.name, kwargs))
yield
self._undo_wrapping = add_method_wrapper(HookCaller, _docall)
2014-10-06 19:37:57 +08:00
pluginmanager.add_shutdown(self._undo_wrapping)
def finish_recording(self):
2014-10-06 19:37:57 +08:00
self._undo_wrapping()
def getcalls(self, names):
if isinstance(names, str):
names = names.split()
2014-10-04 21:49:31 +08:00
return [call for call in self.calls if call._name in names]
2014-10-06 19:37:57 +08:00
def assert_contains(self, entries):
__tracebackhide__ = True
i = 0
entries = list(entries)
2014-08-01 06:13:40 +08:00
backlocals = sys._getframe(1).f_locals
while entries:
name, check = entries.pop(0)
for ind, call in enumerate(self.calls[i:]):
if call._name == name:
print_("NAMEMATCH", name, call)
if eval(check, backlocals, call.__dict__):
print_("CHECKERMATCH", repr(check), "->", call)
else:
print_("NOCHECKERMATCH", repr(check), "-", call)
continue
i += ind + 1
break
print_("NONAMEMATCH", name, "with", call)
else:
pytest.fail("could not find %r check %r" % (name, check))
def popcall(self, name):
2010-11-22 22:20:18 +08:00
__tracebackhide__ = True
for i, call in enumerate(self.calls):
if call._name == name:
del self.calls[i]
return call
2010-11-22 22:20:18 +08:00
lines = ["could not find call %r, in:" % (name,)]
lines.extend([" %s" % str(x) for x in self.calls])
pytest.fail("\n".join(lines))
def getcall(self, name):
l = self.getcalls(name)
assert len(l) == 1, (name, l)
return l[0]
2014-10-06 19:37:57 +08:00
# functionality for test reports
def getreports(self,
names="pytest_runtest_logreport pytest_collectreport"):
return [x.report for x in self.getcalls(names)]
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: "
"no test reports at all!" % (inamepart,))
if len(l) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" %(inamepart, l))
return l[0]
def getfailures(self,
names='pytest_runtest_logreport pytest_collectreport'):
return [rep for rep in self.getreports(names) if rep.failed]
def getfailedcollections(self):
return self.getfailures('pytest_collectreport')
def listoutcomes(self):
passed = []
skipped = []
failed = []
for rep in self.getreports(
"pytest_collectreport pytest_runtest_logreport"):
if rep.passed:
if getattr(rep, "when", None) == "call":
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
elif rep.failed:
failed.append(rep)
return passed, skipped, failed
def countoutcomes(self):
return [len(x) for x in self.listoutcomes()]
def assertoutcome(self, passed=0, skipped=0, failed=0):
realpassed, realskipped, realfailed = self.listoutcomes()
assert passed == len(realpassed)
assert skipped == len(realskipped)
assert failed == len(realfailed)
def clear(self):
self.calls[:] = []
def pytest_funcarg__linecomp(request):
return LineComp()
def pytest_funcarg__LineMatcher(request):
return LineMatcher
def pytest_funcarg__testdir(request):
tmptestdir = TmpTestdir(request)
return tmptestdir
rex_outcome = re.compile("(\d+) (\w+)")
class RunResult:
def __init__(self, ret, outlines, errlines, duration):
self.ret = ret
self.outlines = outlines
self.errlines = errlines
self.stdout = LineMatcher(outlines)
self.stderr = LineMatcher(errlines)
self.duration = duration
def parseoutcomes(self):
for line in reversed(self.outlines):
if 'seconds' in line:
outcomes = rex_outcome.findall(line)
if outcomes:
d = {}
for num, cat in outcomes:
d[cat] = int(num)
return d
class TmpTestdir:
def __init__(self, request):
self.request = request
self.Config = request.config.__class__
# XXX remove duplication with tmpdir plugin
basetmp = request.config._tmpdirhandler.ensuretemp("testdir")
name = request.function.__name__
for i in range(100):
try:
tmpdir = basetmp.mkdir(name + str(i))
except py.error.EEXIST:
continue
break
2013-02-04 03:47:39 +08:00
self.tmpdir = tmpdir
self.plugins = []
self._syspathremove = []
self.chdir() # always chdir
self.request.addfinalizer(self.finalize)
def __repr__(self):
return "<TmpTestdir %r>" % (self.tmpdir,)
def finalize(self):
for p in self._syspathremove:
2014-08-01 06:13:40 +08:00
sys.path.remove(p)
if hasattr(self, '_olddir'):
self._olddir.chdir()
# delete modules that have been loaded from tmpdir
for name, mod in list(sys.modules.items()):
if mod:
fn = getattr(mod, '__file__', None)
if fn and fn.startswith(str(self.tmpdir)):
del sys.modules[name]
2014-10-06 19:37:57 +08:00
def make_hook_recorder(self, pluginmanager):
assert not hasattr(pluginmanager, "reprec")
pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
self.request.addfinalizer(reprec.finish_recording)
return reprec
def chdir(self):
old = self.tmpdir.chdir()
if not hasattr(self, '_olddir'):
self._olddir = old
def _makefile(self, ext, args, kwargs):
items = list(kwargs.items())
if args:
source = py.builtin._totext("\n").join(
map(py.builtin._totext, args)) + py.builtin._totext("\n")
basename = self.request.function.__name__
items.insert(0, (basename, source))
ret = None
for name, value in items:
p = self.tmpdir.join(name).new(ext=ext)
source = py.code.Source(value)
def my_totext(s, encoding="utf-8"):
if py.builtin._isbytes(s):
s = py.builtin._totext(s, encoding=encoding)
return s
source_unicode = "\n".join([my_totext(line) for line in source.lines])
source = py.builtin._totext(source_unicode)
content = source.strip().encode("utf-8") # + "\n"
#content = content.rstrip() + "\n"
p.write(content, "wb")
if ret is None:
ret = p
return ret
def makefile(self, ext, *args, **kwargs):
return self._makefile(ext, args, kwargs)
def makeconftest(self, source):
return self.makepyfile(conftest=source)
def makeini(self, source):
return self.makefile('.ini', tox=source)
def getinicfg(self, source):
p = self.makeini(source)
return py.iniconfig.IniConfig(p)['pytest']
def makepyfile(self, *args, **kwargs):
return self._makefile('.py', args, kwargs)
def maketxtfile(self, *args, **kwargs):
return self._makefile('.txt', args, kwargs)
def syspathinsert(self, path=None):
if path is None:
path = self.tmpdir
2014-08-01 06:13:40 +08:00
sys.path.insert(0, str(path))
self._syspathremove.append(str(path))
def mkdir(self, name):
return self.tmpdir.mkdir(name)
def mkpydir(self, name):
p = self.mkdir(name)
p.ensure("__init__.py")
return p
Session = Session
def getnode(self, config, arg):
session = Session(config)
assert '::' not in str(arg)
p = py.path.local(arg)
x = session.fspath.bestrelpath(p)
config.hook.pytest_sessionstart(session=session)
res = session.perform_collect([x], genitems=False)[0]
config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
return res
def getpathnode(self, path):
2011-05-27 07:37:04 +08:00
config = self.parseconfigure(path)
session = Session(config)
x = session.fspath.bestrelpath(path)
config.hook.pytest_sessionstart(session=session)
res = session.perform_collect([x], genitems=False)[0]
config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
return res
def genitems(self, colitems):
session = colitems[0].session
result = []
for colitem in colitems:
result.extend(session.genitems(colitem))
return result
def runitem(self, source):
# used from runner functional tests
item = self.getitem(source)
# the test class where we are called from wants to provide the runner
testclassinstance = self.request.instance
runner = testclassinstance.getrunner()
return runner(item)
def inline_runsource(self, source, *cmdlineargs):
p = self.makepyfile(source)
l = list(cmdlineargs) + [p]
return self.inline_run(*l)
def inline_runsource1(self, *args):
args = list(args)
source = args.pop()
p = self.makepyfile(source)
l = list(args) + [p]
reprec = self.inline_run(*l)
reports = reprec.getreports("pytest_runtest_logreport")
assert len(reports) == 3, reports # setup/call/teardown
return reports[1]
def inline_genitems(self, *args):
return self.inprocess_run(list(args) + ['--collectonly'])
2014-10-06 19:37:57 +08:00
def inprocess_run(self, args, plugins=()):
rec = self.inline_run(*args, plugins=plugins)
items = [x.item for x in rec.getcalls("pytest_itemcollected")]
return items, rec
2014-10-06 19:37:57 +08:00
def inline_run(self, *args, **kwargs):
rec = []
class Collect:
def pytest_configure(x, config):
2014-10-06 19:37:57 +08:00
rec.append(self.make_hook_recorder(config.pluginmanager))
plugins = kwargs.get("plugins") or []
plugins.append(Collect())
ret = pytest.main(list(args), plugins=plugins)
2014-10-06 19:37:57 +08:00
assert len(rec) == 1
reprec = rec[0]
reprec.ret = ret
2014-10-06 19:37:57 +08:00
return reprec
def parseconfig(self, *args):
2011-11-08 02:28:30 +08:00
args = [str(x) for x in args]
for x in args:
if str(x).startswith('--basetemp'):
break
else:
args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp'))
import _pytest.config
config = _pytest.config._prepareconfig(args, self.plugins)
# we don't know what the test will do with this half-setup config
# object and thus we make sure it gets unconfigured properly in any
# case (otherwise capturing could still be active, for example)
def ensure_unconfigure():
if hasattr(config.pluginmanager, "_config"):
config.pluginmanager.do_unconfigure(config)
config.pluginmanager.ensure_shutdown()
self.request.addfinalizer(ensure_unconfigure)
return config
def parseconfigure(self, *args):
config = self.parseconfig(*args)
config.do_configure()
self.request.addfinalizer(lambda:
config.do_unconfigure())
return config
def getitem(self, source, funcname="test_func"):
items = self.getitems(source)
for item in items:
if item.name == funcname:
return item
assert 0, "%r item not found in module:\n%s\nitems: %s" %(
funcname, source, items)
def getitems(self, source):
modcol = self.getmodulecol(source)
return self.genitems([modcol])
def getmodulecol(self, source, configargs=(), withinit=False):
kw = {self.request.function.__name__: py.code.Source(source).strip()}
path = self.makepyfile(**kw)
if withinit:
self.makepyfile(__init__ = "#")
self.config = config = self.parseconfigure(path, *configargs)
node = self.getnode(config, path)
return node
def collect_by_name(self, modcol, name):
for colitem in modcol._memocollect():
if colitem.name == name:
return colitem
def popen(self, cmdargs, stdout, stderr, **kw):
2009-04-29 05:49:03 +08:00
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(filter(None, [
str(os.getcwd()), env.get('PYTHONPATH', '')]))
2009-04-29 05:49:03 +08:00
kw['env'] = env
2014-08-01 06:13:40 +08:00
return subprocess.Popen(cmdargs,
stdout=stdout, stderr=stderr, **kw)
def run(self, *cmdargs):
return self._run(*cmdargs)
def _run(self, *cmdargs):
cmdargs = [str(x) for x in cmdargs]
p1 = self.tmpdir.join("stdout")
p2 = self.tmpdir.join("stderr")
print_("running", cmdargs, "curdir=", py.path.local())
f1 = codecs.open(str(p1), "w", encoding="utf8")
f2 = codecs.open(str(p2), "w", encoding="utf8")
try:
now = time.time()
popen = self.popen(cmdargs, stdout=f1, stderr=f2,
close_fds=(sys.platform != "win32"))
ret = popen.wait()
finally:
f1.close()
f2.close()
f1 = codecs.open(str(p1), "r", encoding="utf8")
f2 = codecs.open(str(p2), "r", encoding="utf8")
try:
out = f1.read().splitlines()
err = f2.read().splitlines()
finally:
f1.close()
f2.close()
self._dump_lines(out, sys.stdout)
self._dump_lines(err, sys.stderr)
return RunResult(ret, out, err, time.time()-now)
def _dump_lines(self, lines, fp):
try:
for line in lines:
py.builtin.print_(line, file=fp)
except UnicodeEncodeError:
print("couldn't print to %s because of encoding" % (fp,))
def runpybin(self, scriptname, *args):
fullargs = self._getpybinargs(scriptname) + args
return self.run(*fullargs)
def _getpybinargs(self, scriptname):
if not self.request.config.getvalue("notoolsonpath"):
2014-01-18 19:39:16 +08:00
# XXX we rely on script referring to the correct environment
2014-08-01 06:13:40 +08:00
# we cannot use "(sys.executable,script)"
2014-01-18 19:39:16 +08:00
# because on windows the script is e.g. a py.test.exe
2014-08-01 06:13:40 +08:00
return (sys.executable, _pytest_fullpath,) # noqa
else:
pytest.skip("cannot run %r with --no-tools-on-path" % scriptname)
def runpython(self, script, prepend=True):
if prepend:
s = self._getsysprepend()
if s:
script.write(s + "\n" + script.read())
return self.run(sys.executable, script)
def _getsysprepend(self):
if self.request.config.getvalue("notoolsonpath"):
s = "import sys;sys.path.insert(0,%r);" % str(py._pydir.dirpath())
else:
s = ""
return s
def runpython_c(self, command):
command = self._getsysprepend() + command
2014-08-01 06:13:40 +08:00
return self.run(sys.executable, "-c", command)
def runpytest(self, *args):
p = py.path.local.make_numbered_dir(prefix="runpytest-",
keep=None, rootdir=self.tmpdir)
args = ('--basetemp=%s' % p, ) + args
#for x in args:
# if '--confcutdir' in str(x):
# break
#else:
# pass
# args = ('--confcutdir=.',) + args
plugins = [x for x in self.plugins if isinstance(x, str)]
if plugins:
args = ('-p', plugins[0]) + args
return self.runpybin("py.test", *args)
def spawn_pytest(self, string, expect_timeout=10.0):
if self.request.config.getvalue("notoolsonpath"):
pytest.skip("--no-tools-on-path prevents running pexpect-spawn tests")
basetemp = self.tmpdir.mkdir("pexpect")
invoke = " ".join(map(str, self._getpybinargs("py.test")))
cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
return self.spawn(cmd, expect_timeout=expect_timeout)
def spawn(self, cmd, expect_timeout=10.0):
pexpect = pytest.importorskip("pexpect", "3.0")
2014-08-01 06:13:40 +08:00
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
2010-11-27 23:40:52 +08:00
pytest.skip("pypy-64 bit not supported")
2011-11-12 05:33:45 +08:00
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
2012-10-19 21:59:29 +08:00
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def getdecoded(out):
try:
return out.decode("utf-8")
except UnicodeDecodeError:
return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
py.io.saferepr(out),)
class LineComp:
def __init__(self):
self.stringio = py.io.TextIO()
def assert_contains_lines(self, lines2):
""" assert that lines2 are contained (linearly) in lines1.
return a list of extralines found.
"""
__tracebackhide__ = True
val = self.stringio.getvalue()
self.stringio.truncate(0)
self.stringio.seek(0)
lines1 = val.split("\n")
return LineMatcher(lines1).fnmatch_lines(lines2)
class LineMatcher:
def __init__(self, lines):
self.lines = lines
def str(self):
return "\n".join(self.lines)
def _getlines(self, lines2):
if isinstance(lines2, str):
lines2 = py.code.Source(lines2)
if isinstance(lines2, py.code.Source):
lines2 = lines2.strip().lines
return lines2
def fnmatch_lines_random(self, lines2):
lines2 = self._getlines(lines2)
for line in lines2:
for x in self.lines:
if line == x or fnmatch(x, line):
print_("matched: ", repr(line))
break
else:
raise ValueError("line %r not found in output" % line)
def get_lines_after(self, fnline):
for i, line in enumerate(self.lines):
if fnline == line or fnmatch(line, fnline):
return self.lines[i+1:]
raise ValueError("line %r not found in output" % fnline)
def fnmatch_lines(self, lines2):
def show(arg1, arg2):
2014-08-01 06:13:40 +08:00
py.builtin.print_(arg1, arg2, file=sys.stderr)
lines2 = self._getlines(lines2)
lines1 = self.lines[:]
nextline = None
extralines = []
__tracebackhide__ = True
for line in lines2:
nomatchprinted = False
while lines1:
nextline = lines1.pop(0)
if line == nextline:
show("exact match:", repr(line))
break
elif fnmatch(nextline, line):
show("fnmatch:", repr(line))
show(" with:", repr(nextline))
break
else:
if not nomatchprinted:
show("nomatch:", repr(line))
nomatchprinted = True
show(" and:", repr(nextline))
extralines.append(nextline)
else:
pytest.fail("remains unmatched: %r, see stderr" % (line,))