test_ok2/_pytest/main.py

797 lines
29 KiB
Python
Raw Normal View History

""" core implementation of testing process: init, session, runtest loop. """
import py
import pytest, _pytest
import inspect
import os, sys, imp
from _pytest.monkeypatch import monkeypatch
from py._code.code import TerminalRepr
tracebackcutdir = py.path.local(_pytest.__file__).dirpath()
# exitcodes for the command line
EXIT_OK = 0
EXIT_TESTSFAILED = 1
EXIT_INTERRUPTED = 2
EXIT_INTERNALERROR = 3
EXIT_USAGEERROR = 4
name_re = py.std.re.compile("^[a-zA-Z_]\w*$")
def pytest_addoption(parser):
parser.addini("norecursedirs", "directory patterns to avoid for recursion",
type="args", default=('.*', 'CVS', '_darcs', '{arch}'))
#parser.addini("dirpatterns",
# "patterns specifying possible locations of test files",
# type="linelist", default=["**/test_*.txt",
# "**/test_*.py", "**/*_test.py"]
#)
group = parser.getgroup("general", "running and selection options")
group._addoption('-x', '--exitfirst', action="store_true", default=False,
dest="exitfirst",
help="exit instantly on first error or failed test."),
group._addoption('--maxfail', metavar="num",
action="store", type="int", dest="maxfail", default=0,
help="exit after first num failures or errors.")
group._addoption('--strict', action="store_true",
help="run pytest in strict mode, warnings become errors.")
group = parser.getgroup("collect", "collection")
group.addoption('--collectonly',
action="store_true", dest="collectonly",
help="only collect tests, don't execute them."),
group.addoption('--pyargs', action="store_true",
help="try to interpret all arguments as python packages.")
group.addoption("--ignore", action="append", metavar="path",
help="ignore path during collection (multi-allowed).")
group.addoption('--confcutdir', dest="confcutdir", default=None,
metavar="dir",
help="only load conftest.py's relative to specified dir.")
group = parser.getgroup("debugconfig",
"test session debugging and configuration")
group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir",
help="base temporary directory for this test run.")
def pytest_namespace():
2011-05-27 07:58:31 +08:00
collect = dict(Item=Item, Collector=Collector, File=File, Session=Session)
return dict(collect=collect)
def pytest_configure(config):
py.test.config = config # compatibiltiy
if config.option.exitfirst:
config.option.maxfail = 1
def wrap_session(config, doit):
"""Skeleton command line program"""
session = Session(config)
session.exitstatus = EXIT_OK
initstate = 0
try:
try:
config.pluginmanager.do_configure(config)
initstate = 1
config.hook.pytest_sessionstart(session=session)
initstate = 2
doit(config, session)
except pytest.UsageError:
msg = sys.exc_info()[1].args[0]
sys.stderr.write("ERROR: %s\n" %(msg,))
session.exitstatus = EXIT_USAGEERROR
except KeyboardInterrupt:
excinfo = py.code.ExceptionInfo()
config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
session.exitstatus = EXIT_INTERRUPTED
except:
excinfo = py.code.ExceptionInfo()
config.pluginmanager.notify_exception(excinfo, config.option)
session.exitstatus = EXIT_INTERNALERROR
if excinfo.errisinstance(SystemExit):
sys.stderr.write("mainloop: caught Spurious SystemExit!\n")
finally:
if initstate >= 2:
config.hook.pytest_sessionfinish(session=session,
exitstatus=session.exitstatus or (session._testsfailed and 1))
if not session.exitstatus and session._testsfailed:
session.exitstatus = EXIT_TESTSFAILED
if initstate >= 1:
config.pluginmanager.do_unconfigure(config)
return session.exitstatus
def pytest_cmdline_main(config):
return wrap_session(config, _main)
def _main(config, session):
""" default command line protocol for initialization, session,
running tests and reporting. """
config.hook.pytest_collection(session=session)
config.hook.pytest_runtestloop(session=session)
def pytest_collection(session):
return session.perform_collect()
def pytest_runtestloop(session):
if session.config.option.collectonly:
return True
for i, item in enumerate(session.items):
try:
nextitem = session.items[i+1]
except IndexError:
nextitem = None
item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
return True
def pytest_ignore_collect(path, config):
p = path.dirpath()
ignore_paths = config._getconftest_pathlist("collect_ignore", path=p)
ignore_paths = ignore_paths or []
excludeopt = config.getvalue("ignore")
if excludeopt:
ignore_paths.extend([py.path.local(x) for x in excludeopt])
return path in ignore_paths
class HookProxy:
def __init__(self, fspath, config):
self.fspath = fspath
self.config = config
def __getattr__(self, name):
hookmethod = getattr(self.config.hook, name)
def call_matching_hooks(**kwargs):
plugins = self.config._getmatchingplugins(self.fspath)
return hookmethod.pcall(plugins, **kwargs)
return call_matching_hooks
def compatproperty(name):
def fget(self):
# deprecated - use pytest.name
return getattr(pytest, name)
return property(fget)
class Node(object):
""" base class for Collector and Item the test collection tree.
Collector subclasses have children, Items are terminal nodes."""
def __init__(self, name, parent=None, config=None, session=None):
#: a unique name within the scope of the parent node
self.name = name
#: the parent collector node.
self.parent = parent
#: the pytest config object
self.config = config or parent.config
#: the session this node is part of
self.session = session or parent.session
#: filesystem path where this node was collected from (can be None)
self.fspath = getattr(parent, 'fspath', None)
#: keywords on this node (node name is always contained)
self.keywords = {self.name: True}
#: fspath sensitive hook proxy used to call pytest hooks
self.ihook = self.session.gethookproxy(self.fspath)
#self.extrainit()
#def extrainit(self):
# """"extra initialization after Node is initialized. Implemented
# by some subclasses. """
Module = compatproperty("Module")
Class = compatproperty("Class")
Instance = compatproperty("Instance")
Function = compatproperty("Function")
File = compatproperty("File")
Item = compatproperty("Item")
def _getcustomclass(self, name):
cls = getattr(self, name)
if cls != getattr(pytest, name):
py.log._apiwarn("2.0", "use of node.%s is deprecated, "
"use pytest_pycollect_makeitem(...) to create custom "
"collection nodes" % name)
return cls
def __repr__(self):
return "<%s %r>" %(self.__class__.__name__,
getattr(self, 'name', None))
# methods for ordering nodes
@property
def nodeid(self):
""" a ::-separated string denoting its collection tree address. """
try:
return self._nodeid
except AttributeError:
self._nodeid = x = self._makeid()
return x
def _makeid(self):
return self.parent.nodeid + "::" + self.name
def __eq__(self, other):
if not isinstance(other, Node):
return False
return (self.__class__ == other.__class__ and
self.name == other.name and self.parent == other.parent)
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash((self.name, self.parent))
def setup(self):
pass
def teardown(self):
pass
def _memoizedcall(self, attrname, function):
exattrname = "_ex_" + attrname
failure = getattr(self, exattrname, None)
if failure is not None:
py.builtin._reraise(failure[0], failure[1], failure[2])
if hasattr(self, attrname):
return getattr(self, attrname)
try:
res = function()
except py.builtin._sysex:
raise
except:
failure = py.std.sys.exc_info()
setattr(self, exattrname, failure)
raise
setattr(self, attrname, res)
return res
def listchain(self):
""" return list of all parent collectors up to self,
starting from root of collection tree. """
2011-12-02 02:36:44 +08:00
chain = []
item = self
while item is not None:
chain.append(item)
item = item.parent
chain.reverse()
return chain
def listnames(self):
return [x.name for x in self.listchain()]
2010-11-25 05:22:52 +08:00
def getplugins(self):
return self.config._getmatchingplugins(self.fspath)
def getparent(self, cls):
current = self
while current and not isinstance(current, cls):
current = current.parent
return current
def _prunetraceback(self, excinfo):
pass
def _repr_failure_py(self, excinfo, style=None):
LE = self.session.funcargmanager.FuncargLookupError
if excinfo.errisinstance(LE):
function = excinfo.value.function
if function is not None:
fspath, lineno = getfslineno(function)
lines, _ = inspect.getsourcelines(function)
for i, line in enumerate(lines):
if line.strip().startswith('def'):
return FuncargLookupErrorRepr(fspath,
lineno, lines[:i+1],
str(excinfo.value.msg))
if self.config.option.fulltrace:
style="long"
else:
self._prunetraceback(excinfo)
# XXX should excinfo.getrepr record all data and toterminal()
# process it?
if style is None:
if self.config.option.tbstyle == "short":
style = "short"
else:
style = "long"
return excinfo.getrepr(funcargs=True,
showlocals=self.config.option.showlocals,
style=style)
repr_failure = _repr_failure_py
class Collector(Node):
""" Collector instances create children through collect()
and thus iteratively build a tree.
"""
class CollectError(Exception):
""" an error during collection, contains a custom message. """
def collect(self):
""" returns a list of children (items and collectors)
for this collection node.
"""
raise NotImplementedError("abstract")
def repr_failure(self, excinfo):
""" represent a collection failure. """
if excinfo.errisinstance(self.CollectError):
exc = excinfo.value
return str(exc.args[0])
return self._repr_failure_py(excinfo, style="short")
def _memocollect(self):
""" internal helper method to cache results of calling collect(). """
return self._memoizedcall('_collected', lambda: list(self.collect()))
def _prunetraceback(self, excinfo):
if hasattr(self, 'fspath'):
path = self.fspath
traceback = excinfo.traceback
ntraceback = traceback.cut(path=self.fspath)
if ntraceback == traceback:
ntraceback = ntraceback.cut(excludepath=tracebackcutdir)
excinfo.traceback = ntraceback.filter()
class FSCollector(Collector):
def __init__(self, fspath, parent=None, config=None, session=None):
fspath = py.path.local(fspath) # xxx only for test_resultlog.py?
name = fspath.basename
if parent is not None:
rel = fspath.relto(parent.fspath)
if rel:
name = rel
name = name.replace(os.sep, "/")
super(FSCollector, self).__init__(name, parent, config, session)
self.fspath = fspath
def _makeid(self):
if self == self.session:
return "."
relpath = self.session.fspath.bestrelpath(self.fspath)
if os.sep != "/":
2010-11-07 08:13:40 +08:00
relpath = relpath.replace(os.sep, "/")
return relpath
class File(FSCollector):
""" base class for collecting tests from a file. """
class Item(Node):
""" a basic test invocation item. Note that for a single function
there might be multiple test invocation items.
"""
nextitem = None
def reportinfo(self):
return self.fspath, None, ""
def applymarker(self, marker):
""" Apply a marker to this item. This method is
useful if you have several parametrized function
and want to mark a single one of them.
:arg marker: a :py:class:`_pytest.mark.MarkDecorator` object
created by a call to ``py.test.mark.NAME(...)``.
"""
if not isinstance(marker, pytest.mark.XYZ.__class__):
raise ValueError("%r is not a py.test.mark.* object")
self.keywords[marker.markname] = marker
@property
def location(self):
try:
return self._location
except AttributeError:
location = self.reportinfo()
# bestrelpath is a quite slow function
cache = self.config.__dict__.setdefault("_bestrelpathcache", {})
try:
fspath = cache[location[0]]
except KeyError:
fspath = self.session.fspath.bestrelpath(location[0])
cache[location[0]] = fspath
location = (fspath, location[1], str(location[2]))
self._location = location
return location
class FuncargLookupError(LookupError):
""" could not find a factory. """
def __init__(self, function, msg):
self.function = function
self.msg = msg
class FuncargManager:
_argprefix = "pytest_funcarg__"
FuncargLookupError = FuncargLookupError
def __init__(self, session):
self.session = session
self.config = session.config
self.arg2facspec = {}
session.config.pluginmanager.register(self, "funcmanage")
self._holderobjseen = set()
### XXX this hook should be called for historic events like pytest_configure
### so that we don't have to do the below pytest_collection hook
def pytest_plugin_registered(self, plugin):
#print "plugin_registered", plugin
nodeid = ""
try:
p = py.path.local(plugin.__file__)
except AttributeError:
pass
else:
if p.basename.startswith("conftest.py"):
nodeid = p.dirpath().relto(self.session.fspath)
self._parsefactories(plugin, nodeid)
@pytest.mark.tryfirst
def pytest_collection(self, session):
plugins = session.config.pluginmanager.getplugins()
for plugin in plugins:
self.pytest_plugin_registered(plugin)
def pytest_generate_tests(self, metafunc):
funcargnames = list(metafunc.funcargnames)
seen = set()
while funcargnames:
argname = funcargnames.pop(0)
if argname in seen:
continue
seen.add(argname)
faclist = self.getfactorylist(argname, metafunc.parentid,
metafunc.function, raising=False)
if faclist is None:
continue # will raise FuncargLookupError at setup time
for fac in faclist:
marker = getattr(fac, "funcarg", None)
if marker is not None:
params = marker.kwargs.get("params")
if params is not None:
metafunc.parametrize(argname, params, indirect=True)
newfuncargnames = getfuncargnames(fac)
newfuncargnames.remove("request")
funcargnames.extend(newfuncargnames)
def _parsefactories(self, holderobj, nodeid):
if holderobj in self._holderobjseen:
return
#print "parsefactories", holderobj
self._holderobjseen.add(holderobj)
for name in dir(holderobj):
#print "check", holderobj, name
obj = getattr(holderobj, name)
# funcarg factories either have a pytest_funcarg__ prefix
# or are "funcarg" marked
if hasattr(obj, "funcarg"):
if name.startswith(self._argprefix):
argname = name[len(self._argprefix):]
else:
argname = name
elif name.startswith(self._argprefix):
argname = name[len(self._argprefix):]
else:
continue
faclist = self.arg2facspec.setdefault(argname, [])
faclist.append((nodeid, obj))
def getfactorylist(self, argname, nodeid, function, raising=True):
try:
factorydef = self.arg2facspec[argname]
except KeyError:
if raising:
self._raiselookupfailed(argname, function, nodeid)
else:
return self._matchfactories(factorydef, nodeid)
def _matchfactories(self, factorydef, nodeid):
l = []
for baseid, factory in factorydef:
#print "check", basepath, nodeid
if nodeid.startswith(baseid):
l.append(factory)
return l
def _raiselookupfailed(self, argname, function, nodeid):
available = []
for name, facdef in self.arg2facspec.items():
faclist = self._matchfactories(facdef, nodeid)
if faclist:
available.append(name)
msg = "LookupError: no factory found for argument %r" % (argname,)
msg += "\n available funcargs: %s" %(", ".join(available),)
msg += "\n use 'py.test --funcargs [testpath]' for help on them."
raise FuncargLookupError(function, msg)
class NoMatch(Exception):
""" raised if matching cannot locate a matching names. """
class Session(FSCollector):
class Interrupted(KeyboardInterrupt):
""" signals an interrupted test run. """
__module__ = 'builtins' # for py3
def __init__(self, config):
FSCollector.__init__(self, py.path.local(), parent=None,
config=config, session=self)
self.config.pluginmanager.register(self, name="session", prepend=True)
self._testsfailed = 0
self.shouldstop = False
self.trace = config.trace.root.get("collection")
self._norecursepatterns = config.getini("norecursedirs")
self.funcargmanager = FuncargManager(self)
def pytest_collectstart(self):
if self.shouldstop:
raise self.Interrupted(self.shouldstop)
def pytest_runtest_logreport(self, report):
if report.failed and not hasattr(report, 'wasxfail'):
self._testsfailed += 1
maxfail = self.config.getvalue("maxfail")
if maxfail and self._testsfailed >= maxfail:
self.shouldstop = "stopping after %d failures" % (
self._testsfailed)
pytest_collectreport = pytest_runtest_logreport
def isinitpath(self, path):
return path in self._initialpaths
def gethookproxy(self, fspath):
return HookProxy(fspath, self.config)
def perform_collect(self, args=None, genitems=True):
hook = self.config.hook
try:
items = self._perform_collect(args, genitems)
hook.pytest_collection_modifyitems(session=self,
config=self.config, items=items)
finally:
hook.pytest_collection_finish(session=self)
return items
def _perform_collect(self, args, genitems):
if args is None:
args = self.config.args
self.trace("perform_collect", self, args)
self.trace.root.indent += 1
self._notfound = []
self._initialpaths = set()
self._initialparts = []
self.items = items = []
for arg in args:
parts = self._parsearg(arg)
self._initialparts.append(parts)
self._initialpaths.add(parts[0])
self.ihook.pytest_collectstart(collector=self)
2011-05-27 08:43:02 +08:00
rep = self.ihook.pytest_make_collect_report(collector=self)
self.ihook.pytest_collectreport(report=rep)
self.trace.root.indent -= 1
if self._notfound:
for arg, exc in self._notfound:
line = "(no name %r in any of %r)" % (arg, exc.args[0])
raise pytest.UsageError("not found: %s\n%s" %(arg, line))
if not genitems:
return rep.result
else:
if rep.passed:
for node in rep.result:
self.items.extend(self.genitems(node))
return items
def collect(self):
for parts in self._initialparts:
arg = "::".join(map(str, parts))
self.trace("processing argument", arg)
self.trace.root.indent += 1
try:
for x in self._collect(arg):
yield x
except NoMatch:
# we are inside a make_report hook so
# we cannot directly pass through the exception
self._notfound.append((arg, sys.exc_info()[1]))
self.trace.root.indent -= 1
break
self.trace.root.indent -= 1
def _collect(self, arg):
names = self._parsearg(arg)
path = names.pop(0)
if path.check(dir=1):
assert not names, "invalid arg %r" %(arg,)
for path in path.visit(fil=lambda x: x.check(file=1),
rec=self._recurse, bf=True, sort=True):
for x in self._collectfile(path):
yield x
else:
assert path.check(file=1)
for x in self.matchnodes(self._collectfile(path), names):
yield x
def _collectfile(self, path):
ihook = self.gethookproxy(path)
if not self.isinitpath(path):
if ihook.pytest_ignore_collect(path=path, config=self.config):
return ()
return ihook.pytest_collect_file(path=path, parent=self)
def _recurse(self, path):
ihook = self.gethookproxy(path.dirpath())
if ihook.pytest_ignore_collect(path=path, config=self.config):
return
for pat in self._norecursepatterns:
if path.check(fnmatch=pat):
return False
ihook = self.gethookproxy(path)
ihook.pytest_collect_directory(path=path, parent=self)
return True
def _tryconvertpyarg(self, x):
mod = None
path = [os.path.abspath('.')] + sys.path
for name in x.split('.'):
2011-10-27 13:38:44 +08:00
# ignore anything that's not a proper name here
# else something like --pyargs will mess up '.'
2011-10-27 13:38:44 +08:00
# since imp.find_module will actually sometimes work for it
# but it's supposed to be considered a filesystem path
# not a package
if name_re.match(name) is None:
return x
try:
fd, mod, type_ = imp.find_module(name, path)
except ImportError:
return x
else:
if fd is not None:
fd.close()
if type_[2] != imp.PKG_DIRECTORY:
path = [os.path.dirname(mod)]
else:
path = [mod]
return mod
def _parsearg(self, arg):
""" return (fspath, names) tuple after checking the file exists. """
arg = str(arg)
if self.config.option.pyargs:
arg = self._tryconvertpyarg(arg)
parts = str(arg).split("::")
relpath = parts[0].replace("/", os.sep)
path = self.fspath.join(relpath, abs=True)
if not path.check():
if self.config.option.pyargs:
msg = "file or package not found: "
else:
msg = "file not found: "
raise pytest.UsageError(msg + arg)
parts[0] = path
return parts
def matchnodes(self, matching, names):
self.trace("matchnodes", matching, names)
self.trace.root.indent += 1
nodes = self._matchnodes(matching, names)
num = len(nodes)
self.trace("matchnodes finished -> ", num, "nodes")
self.trace.root.indent -= 1
if num == 0:
raise NoMatch(matching, names[:1])
return nodes
def _matchnodes(self, matching, names):
if not matching or not names:
return matching
name = names[0]
assert name
nextnames = names[1:]
resultnodes = []
for node in matching:
if isinstance(node, pytest.Item):
if not names:
resultnodes.append(node)
continue
assert isinstance(node, pytest.Collector)
node.ihook.pytest_collectstart(collector=node)
rep = node.ihook.pytest_make_collect_report(collector=node)
if rep.passed:
has_matched = False
for x in rep.result:
if x.name == name:
resultnodes.extend(self.matchnodes([x], nextnames))
has_matched = True
# XXX accept IDs that don't have "()" for class instances
if not has_matched and len(rep.result) == 1 and x.name == "()":
nextnames.insert(0, name)
resultnodes.extend(self.matchnodes([x], nextnames))
node.ihook.pytest_collectreport(report=rep)
return resultnodes
def genitems(self, node):
self.trace("genitems", node)
if isinstance(node, pytest.Item):
node.ihook.pytest_itemcollected(item=node)
yield node
else:
assert isinstance(node, pytest.Collector)
node.ihook.pytest_collectstart(collector=node)
rep = node.ihook.pytest_make_collect_report(collector=node)
if rep.passed:
for subnode in rep.result:
for x in self.genitems(subnode):
yield x
node.ihook.pytest_collectreport(report=rep)
2012-07-16 16:47:00 +08:00
2012-07-16 17:11:26 +08:00
# XXX not used yet
2012-07-16 16:47:00 +08:00
def register_resource_factory(self, name, factoryfunc,
matchscope=None,
cachescope=None):
""" register a factory function for the given name.
:param name: the name which can be used to retrieve a value constructed
by the factory function later.
:param factoryfunc: a function accepting (name, reqnode) parameters
and returning a value.
:param matchscope: denotes visibility of the factory func.
Pass a particular Node instance if you want to
restrict factory function visilbility to its descendants.
Pass None if you want the factory func to be globally
availabile.
:param cachescope: denotes caching scope. If you pass a node instance
the value returned by getresource() will be reused
for all descendants of that node. Pass None (the default)
if you want no caching. Pass "session" if you want to
to cache on a per-session level.
"""
def getfslineno(obj):
# xxx let decorators etc specify a sane ordering
if hasattr(obj, 'place_as'):
obj = obj.place_as
fslineno = py.code.getfslineno(obj)
assert isinstance(fslineno[1], int), obj
return fslineno
2012-07-16 16:47:00 +08:00
class FuncargLookupErrorRepr(TerminalRepr):
def __init__(self, filename, firstlineno, deflines, errorstring):
self.deflines = deflines
self.errorstring = errorstring
self.filename = filename
self.firstlineno = firstlineno
2012-07-16 16:47:00 +08:00
def toterminal(self, tw):
tw.line()
for line in self.deflines:
tw.line(" " + line.strip())
for line in self.errorstring.split("\n"):
tw.line(" " + line.strip(), red=True)
tw.line()
tw.line("%s:%d" % (self.filename, self.firstlineno+1))
def getfuncargnames(function, startindex=None):
# XXX merge with main.py's varnames
argnames = py.std.inspect.getargs(py.code.getrawcode(function))[0]
if startindex is None:
startindex = py.std.inspect.ismethod(function) and 1 or 0
defaults = getattr(function, 'func_defaults',
getattr(function, '__defaults__', None)) or ()
numdefaults = len(defaults)
if numdefaults:
return argnames[startindex:-numdefaults]
return argnames[startindex:]