""" core implementation of testing process: init, session, runtest loop. """ import py import pytest, _pytest import inspect import os, sys, imp from _pytest.monkeypatch import monkeypatch from py._code.code import TerminalRepr tracebackcutdir = py.path.local(_pytest.__file__).dirpath() # exitcodes for the command line EXIT_OK = 0 EXIT_TESTSFAILED = 1 EXIT_INTERRUPTED = 2 EXIT_INTERNALERROR = 3 EXIT_USAGEERROR = 4 name_re = py.std.re.compile("^[a-zA-Z_]\w*$") def pytest_addoption(parser): parser.addini("norecursedirs", "directory patterns to avoid for recursion", type="args", default=('.*', 'CVS', '_darcs', '{arch}')) #parser.addini("dirpatterns", # "patterns specifying possible locations of test files", # type="linelist", default=["**/test_*.txt", # "**/test_*.py", "**/*_test.py"] #) group = parser.getgroup("general", "running and selection options") group._addoption('-x', '--exitfirst', action="store_true", default=False, dest="exitfirst", help="exit instantly on first error or failed test."), group._addoption('--maxfail', metavar="num", action="store", type="int", dest="maxfail", default=0, help="exit after first num failures or errors.") group._addoption('--strict', action="store_true", help="run pytest in strict mode, warnings become errors.") group = parser.getgroup("collect", "collection") group.addoption('--collectonly', action="store_true", dest="collectonly", help="only collect tests, don't execute them."), group.addoption('--pyargs', action="store_true", help="try to interpret all arguments as python packages.") group.addoption("--ignore", action="append", metavar="path", help="ignore path during collection (multi-allowed).") group.addoption('--confcutdir', dest="confcutdir", default=None, metavar="dir", help="only load conftest.py's relative to specified dir.") group = parser.getgroup("debugconfig", "test session debugging and configuration") group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir", help="base temporary directory for this test run.") def pytest_namespace(): collect = dict(Item=Item, Collector=Collector, File=File, Session=Session) return dict(collect=collect) def pytest_configure(config): py.test.config = config # compatibiltiy if config.option.exitfirst: config.option.maxfail = 1 def wrap_session(config, doit): """Skeleton command line program""" session = Session(config) session.exitstatus = EXIT_OK initstate = 0 try: try: config.pluginmanager.do_configure(config) initstate = 1 config.hook.pytest_sessionstart(session=session) initstate = 2 doit(config, session) except pytest.UsageError: msg = sys.exc_info()[1].args[0] sys.stderr.write("ERROR: %s\n" %(msg,)) session.exitstatus = EXIT_USAGEERROR except KeyboardInterrupt: excinfo = py.code.ExceptionInfo() config.hook.pytest_keyboard_interrupt(excinfo=excinfo) session.exitstatus = EXIT_INTERRUPTED except: excinfo = py.code.ExceptionInfo() config.pluginmanager.notify_exception(excinfo, config.option) session.exitstatus = EXIT_INTERNALERROR if excinfo.errisinstance(SystemExit): sys.stderr.write("mainloop: caught Spurious SystemExit!\n") finally: if initstate >= 2: config.hook.pytest_sessionfinish(session=session, exitstatus=session.exitstatus or (session._testsfailed and 1)) if not session.exitstatus and session._testsfailed: session.exitstatus = EXIT_TESTSFAILED if initstate >= 1: config.pluginmanager.do_unconfigure(config) return session.exitstatus def pytest_cmdline_main(config): return wrap_session(config, _main) def _main(config, session): """ default command line protocol for initialization, session, running tests and reporting. """ config.hook.pytest_collection(session=session) config.hook.pytest_runtestloop(session=session) def pytest_collection(session): return session.perform_collect() def pytest_runtestloop(session): if session.config.option.collectonly: return True for i, item in enumerate(session.items): try: nextitem = session.items[i+1] except IndexError: nextitem = None item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) if session.shouldstop: raise session.Interrupted(session.shouldstop) return True def pytest_ignore_collect(path, config): p = path.dirpath() ignore_paths = config._getconftest_pathlist("collect_ignore", path=p) ignore_paths = ignore_paths or [] excludeopt = config.getvalue("ignore") if excludeopt: ignore_paths.extend([py.path.local(x) for x in excludeopt]) return path in ignore_paths class HookProxy: def __init__(self, fspath, config): self.fspath = fspath self.config = config def __getattr__(self, name): hookmethod = getattr(self.config.hook, name) def call_matching_hooks(**kwargs): plugins = self.config._getmatchingplugins(self.fspath) return hookmethod.pcall(plugins, **kwargs) return call_matching_hooks def compatproperty(name): def fget(self): # deprecated - use pytest.name return getattr(pytest, name) return property(fget) class Node(object): """ base class for Collector and Item the test collection tree. Collector subclasses have children, Items are terminal nodes.""" def __init__(self, name, parent=None, config=None, session=None): #: a unique name within the scope of the parent node self.name = name #: the parent collector node. self.parent = parent #: the pytest config object self.config = config or parent.config #: the session this node is part of self.session = session or parent.session #: filesystem path where this node was collected from (can be None) self.fspath = getattr(parent, 'fspath', None) #: keywords on this node (node name is always contained) self.keywords = {self.name: True} #: fspath sensitive hook proxy used to call pytest hooks self.ihook = self.session.gethookproxy(self.fspath) #self.extrainit() #def extrainit(self): # """"extra initialization after Node is initialized. Implemented # by some subclasses. """ Module = compatproperty("Module") Class = compatproperty("Class") Instance = compatproperty("Instance") Function = compatproperty("Function") File = compatproperty("File") Item = compatproperty("Item") def _getcustomclass(self, name): cls = getattr(self, name) if cls != getattr(pytest, name): py.log._apiwarn("2.0", "use of node.%s is deprecated, " "use pytest_pycollect_makeitem(...) to create custom " "collection nodes" % name) return cls def __repr__(self): return "<%s %r>" %(self.__class__.__name__, getattr(self, 'name', None)) # methods for ordering nodes @property def nodeid(self): """ a ::-separated string denoting its collection tree address. """ try: return self._nodeid except AttributeError: self._nodeid = x = self._makeid() return x def _makeid(self): return self.parent.nodeid + "::" + self.name def __eq__(self, other): if not isinstance(other, Node): return False return (self.__class__ == other.__class__ and self.name == other.name and self.parent == other.parent) def __ne__(self, other): return not self == other def __hash__(self): return hash((self.name, self.parent)) def setup(self): pass def teardown(self): pass def _memoizedcall(self, attrname, function): exattrname = "_ex_" + attrname failure = getattr(self, exattrname, None) if failure is not None: py.builtin._reraise(failure[0], failure[1], failure[2]) if hasattr(self, attrname): return getattr(self, attrname) try: res = function() except py.builtin._sysex: raise except: failure = py.std.sys.exc_info() setattr(self, exattrname, failure) raise setattr(self, attrname, res) return res def listchain(self): """ return list of all parent collectors up to self, starting from root of collection tree. """ chain = [] item = self while item is not None: chain.append(item) item = item.parent chain.reverse() return chain def listnames(self): return [x.name for x in self.listchain()] def getplugins(self): return self.config._getmatchingplugins(self.fspath) def getparent(self, cls): current = self while current and not isinstance(current, cls): current = current.parent return current def _prunetraceback(self, excinfo): pass def _repr_failure_py(self, excinfo, style=None): LE = self.session.funcargmanager.FuncargLookupError if excinfo.errisinstance(LE): function = excinfo.value.function if function is not None: fspath, lineno = getfslineno(function) lines, _ = inspect.getsourcelines(function) for i, line in enumerate(lines): if line.strip().startswith('def'): return FuncargLookupErrorRepr(fspath, lineno, lines[:i+1], str(excinfo.value.msg)) if self.config.option.fulltrace: style="long" else: self._prunetraceback(excinfo) # XXX should excinfo.getrepr record all data and toterminal() # process it? if style is None: if self.config.option.tbstyle == "short": style = "short" else: style = "long" return excinfo.getrepr(funcargs=True, showlocals=self.config.option.showlocals, style=style) repr_failure = _repr_failure_py class Collector(Node): """ Collector instances create children through collect() and thus iteratively build a tree. """ class CollectError(Exception): """ an error during collection, contains a custom message. """ def collect(self): """ returns a list of children (items and collectors) for this collection node. """ raise NotImplementedError("abstract") def repr_failure(self, excinfo): """ represent a collection failure. """ if excinfo.errisinstance(self.CollectError): exc = excinfo.value return str(exc.args[0]) return self._repr_failure_py(excinfo, style="short") def _memocollect(self): """ internal helper method to cache results of calling collect(). """ return self._memoizedcall('_collected', lambda: list(self.collect())) def _prunetraceback(self, excinfo): if hasattr(self, 'fspath'): path = self.fspath traceback = excinfo.traceback ntraceback = traceback.cut(path=self.fspath) if ntraceback == traceback: ntraceback = ntraceback.cut(excludepath=tracebackcutdir) excinfo.traceback = ntraceback.filter() class FSCollector(Collector): def __init__(self, fspath, parent=None, config=None, session=None): fspath = py.path.local(fspath) # xxx only for test_resultlog.py? name = fspath.basename if parent is not None: rel = fspath.relto(parent.fspath) if rel: name = rel name = name.replace(os.sep, "/") super(FSCollector, self).__init__(name, parent, config, session) self.fspath = fspath def _makeid(self): if self == self.session: return "." relpath = self.session.fspath.bestrelpath(self.fspath) if os.sep != "/": relpath = relpath.replace(os.sep, "/") return relpath class File(FSCollector): """ base class for collecting tests from a file. """ class Item(Node): """ a basic test invocation item. Note that for a single function there might be multiple test invocation items. """ nextitem = None def reportinfo(self): return self.fspath, None, "" def applymarker(self, marker): """ Apply a marker to this item. This method is useful if you have several parametrized function and want to mark a single one of them. :arg marker: a :py:class:`_pytest.mark.MarkDecorator` object created by a call to ``py.test.mark.NAME(...)``. """ if not isinstance(marker, pytest.mark.XYZ.__class__): raise ValueError("%r is not a py.test.mark.* object") self.keywords[marker.markname] = marker @property def location(self): try: return self._location except AttributeError: location = self.reportinfo() # bestrelpath is a quite slow function cache = self.config.__dict__.setdefault("_bestrelpathcache", {}) try: fspath = cache[location[0]] except KeyError: fspath = self.session.fspath.bestrelpath(location[0]) cache[location[0]] = fspath location = (fspath, location[1], str(location[2])) self._location = location return location class FuncargLookupError(LookupError): """ could not find a factory. """ def __init__(self, function, msg): self.function = function self.msg = msg class FuncargManager: _argprefix = "pytest_funcarg__" FuncargLookupError = FuncargLookupError def __init__(self, session): self.session = session self.config = session.config self.arg2facspec = {} session.config.pluginmanager.register(self, "funcmanage") self._holderobjseen = set() ### XXX this hook should be called for historic events like pytest_configure ### so that we don't have to do the below pytest_collection hook def pytest_plugin_registered(self, plugin): #print "plugin_registered", plugin nodeid = "" try: p = py.path.local(plugin.__file__) except AttributeError: pass else: if p.basename.startswith("conftest.py"): nodeid = p.dirpath().relto(self.session.fspath) self._parsefactories(plugin, nodeid) @pytest.mark.tryfirst def pytest_collection(self, session): plugins = session.config.pluginmanager.getplugins() for plugin in plugins: self.pytest_plugin_registered(plugin) def pytest_generate_tests(self, metafunc): for argname in metafunc.funcargnames: faclist = self.getfactorylist(argname, metafunc.parentid, metafunc.function, raising=False) if faclist is None: continue # will raise at setup time for fac in faclist: marker = getattr(fac, "funcarg", None) if marker is not None: params = marker.kwargs.get("params") if params is not None: metafunc.parametrize(argname, params, indirect=True) def _parsefactories(self, holderobj, nodeid): if holderobj in self._holderobjseen: return #print "parsefactories", holderobj self._holderobjseen.add(holderobj) for name in dir(holderobj): #print "check", holderobj, name if name.startswith(self._argprefix): fname = name[len(self._argprefix):] faclist = self.arg2facspec.setdefault(fname, []) obj = getattr(holderobj, name) faclist.append((nodeid, obj)) def getfactorylist(self, argname, nodeid, function, raising=True): try: factorydef = self.arg2facspec[argname] except KeyError: if raising: self._raiselookupfailed(argname, function, nodeid) else: return self._matchfactories(factorydef, nodeid) def _matchfactories(self, factorydef, nodeid): l = [] for baseid, factory in factorydef: #print "check", basepath, nodeid if nodeid.startswith(baseid): l.append(factory) return l def _raiselookupfailed(self, argname, function, nodeid): available = [] for name, facdef in self.arg2facspec.items(): faclist = self._matchfactories(facdef, nodeid) if faclist: available.append(name) msg = "LookupError: no factory found for argument %r" % (argname,) msg += "\n available funcargs: %s" %(", ".join(available),) msg += "\n use 'py.test --funcargs [testpath]' for help on them." raise FuncargLookupError(function, msg) class NoMatch(Exception): """ raised if matching cannot locate a matching names. """ class Session(FSCollector): class Interrupted(KeyboardInterrupt): """ signals an interrupted test run. """ __module__ = 'builtins' # for py3 def __init__(self, config): FSCollector.__init__(self, py.path.local(), parent=None, config=config, session=self) self.config.pluginmanager.register(self, name="session", prepend=True) self._testsfailed = 0 self.shouldstop = False self.trace = config.trace.root.get("collection") self._norecursepatterns = config.getini("norecursedirs") self.funcargmanager = FuncargManager(self) def pytest_collectstart(self): if self.shouldstop: raise self.Interrupted(self.shouldstop) def pytest_runtest_logreport(self, report): if report.failed and not hasattr(report, 'wasxfail'): self._testsfailed += 1 maxfail = self.config.getvalue("maxfail") if maxfail and self._testsfailed >= maxfail: self.shouldstop = "stopping after %d failures" % ( self._testsfailed) pytest_collectreport = pytest_runtest_logreport def isinitpath(self, path): return path in self._initialpaths def gethookproxy(self, fspath): return HookProxy(fspath, self.config) def perform_collect(self, args=None, genitems=True): hook = self.config.hook try: items = self._perform_collect(args, genitems) hook.pytest_collection_modifyitems(session=self, config=self.config, items=items) finally: hook.pytest_collection_finish(session=self) return items def _perform_collect(self, args, genitems): if args is None: args = self.config.args self.trace("perform_collect", self, args) self.trace.root.indent += 1 self._notfound = [] self._initialpaths = set() self._initialparts = [] self.items = items = [] for arg in args: parts = self._parsearg(arg) self._initialparts.append(parts) self._initialpaths.add(parts[0]) self.ihook.pytest_collectstart(collector=self) rep = self.ihook.pytest_make_collect_report(collector=self) self.ihook.pytest_collectreport(report=rep) self.trace.root.indent -= 1 if self._notfound: for arg, exc in self._notfound: line = "(no name %r in any of %r)" % (arg, exc.args[0]) raise pytest.UsageError("not found: %s\n%s" %(arg, line)) if not genitems: return rep.result else: if rep.passed: for node in rep.result: self.items.extend(self.genitems(node)) return items def collect(self): for parts in self._initialparts: arg = "::".join(map(str, parts)) self.trace("processing argument", arg) self.trace.root.indent += 1 try: for x in self._collect(arg): yield x except NoMatch: # we are inside a make_report hook so # we cannot directly pass through the exception self._notfound.append((arg, sys.exc_info()[1])) self.trace.root.indent -= 1 break self.trace.root.indent -= 1 def _collect(self, arg): names = self._parsearg(arg) path = names.pop(0) if path.check(dir=1): assert not names, "invalid arg %r" %(arg,) for path in path.visit(fil=lambda x: x.check(file=1), rec=self._recurse, bf=True, sort=True): for x in self._collectfile(path): yield x else: assert path.check(file=1) for x in self.matchnodes(self._collectfile(path), names): yield x def _collectfile(self, path): ihook = self.gethookproxy(path) if not self.isinitpath(path): if ihook.pytest_ignore_collect(path=path, config=self.config): return () return ihook.pytest_collect_file(path=path, parent=self) def _recurse(self, path): ihook = self.gethookproxy(path.dirpath()) if ihook.pytest_ignore_collect(path=path, config=self.config): return for pat in self._norecursepatterns: if path.check(fnmatch=pat): return False ihook = self.gethookproxy(path) ihook.pytest_collect_directory(path=path, parent=self) return True def _tryconvertpyarg(self, x): mod = None path = [os.path.abspath('.')] + sys.path for name in x.split('.'): # ignore anything that's not a proper name here # else something like --pyargs will mess up '.' # since imp.find_module will actually sometimes work for it # but it's supposed to be considered a filesystem path # not a package if name_re.match(name) is None: return x try: fd, mod, type_ = imp.find_module(name, path) except ImportError: return x else: if fd is not None: fd.close() if type_[2] != imp.PKG_DIRECTORY: path = [os.path.dirname(mod)] else: path = [mod] return mod def _parsearg(self, arg): """ return (fspath, names) tuple after checking the file exists. """ arg = str(arg) if self.config.option.pyargs: arg = self._tryconvertpyarg(arg) parts = str(arg).split("::") relpath = parts[0].replace("/", os.sep) path = self.fspath.join(relpath, abs=True) if not path.check(): if self.config.option.pyargs: msg = "file or package not found: " else: msg = "file not found: " raise pytest.UsageError(msg + arg) parts[0] = path return parts def matchnodes(self, matching, names): self.trace("matchnodes", matching, names) self.trace.root.indent += 1 nodes = self._matchnodes(matching, names) num = len(nodes) self.trace("matchnodes finished -> ", num, "nodes") self.trace.root.indent -= 1 if num == 0: raise NoMatch(matching, names[:1]) return nodes def _matchnodes(self, matching, names): if not matching or not names: return matching name = names[0] assert name nextnames = names[1:] resultnodes = [] for node in matching: if isinstance(node, pytest.Item): if not names: resultnodes.append(node) continue assert isinstance(node, pytest.Collector) node.ihook.pytest_collectstart(collector=node) rep = node.ihook.pytest_make_collect_report(collector=node) if rep.passed: has_matched = False for x in rep.result: if x.name == name: resultnodes.extend(self.matchnodes([x], nextnames)) has_matched = True # XXX accept IDs that don't have "()" for class instances if not has_matched and len(rep.result) == 1 and x.name == "()": nextnames.insert(0, name) resultnodes.extend(self.matchnodes([x], nextnames)) node.ihook.pytest_collectreport(report=rep) return resultnodes def genitems(self, node): self.trace("genitems", node) if isinstance(node, pytest.Item): node.ihook.pytest_itemcollected(item=node) yield node else: assert isinstance(node, pytest.Collector) node.ihook.pytest_collectstart(collector=node) rep = node.ihook.pytest_make_collect_report(collector=node) if rep.passed: for subnode in rep.result: for x in self.genitems(subnode): yield x node.ihook.pytest_collectreport(report=rep) # XXX not used yet def register_resource_factory(self, name, factoryfunc, matchscope=None, cachescope=None): """ register a factory function for the given name. :param name: the name which can be used to retrieve a value constructed by the factory function later. :param factoryfunc: a function accepting (name, reqnode) parameters and returning a value. :param matchscope: denotes visibility of the factory func. Pass a particular Node instance if you want to restrict factory function visilbility to its descendants. Pass None if you want the factory func to be globally availabile. :param cachescope: denotes caching scope. If you pass a node instance the value returned by getresource() will be reused for all descendants of that node. Pass None (the default) if you want no caching. Pass "session" if you want to to cache on a per-session level. """ def getfslineno(obj): # xxx let decorators etc specify a sane ordering if hasattr(obj, 'place_as'): obj = obj.place_as fslineno = py.code.getfslineno(obj) assert isinstance(fslineno[1], int), obj return fslineno class FuncargLookupErrorRepr(TerminalRepr): def __init__(self, filename, firstlineno, deflines, errorstring): self.deflines = deflines self.errorstring = errorstring self.filename = filename self.firstlineno = firstlineno def toterminal(self, tw): tw.line() for line in self.deflines: tw.line(" " + line.strip()) for line in self.errorstring.split("\n"): tw.line(" " + line.strip(), red=True) tw.line() tw.line("%s:%d" % (self.filename, self.firstlineno+1))