simplify internal plugin dispatching code, rename parts of the py._com plugin helpers

--HG--
branch : 1.0.x
This commit is contained in:
holger krekel 2009-08-09 23:51:25 +02:00
parent 5c8df1d4ca
commit a01e4769cc
16 changed files with 137 additions and 173 deletions

View File

@ -1,18 +1,21 @@
Changes between 1.0.0 and 1.0.1
=====================================
* various unicode fixes: capturing and prints of unicode strings now
work within tests, they are encoded as "utf8" by default, terminalwriting
* unicode fixes: capturing and unicode writes to sys.stdout
(through e.g a print statement) now work within tests,
they are encoded as "utf8" by default, also terminalwriting
was adapted and somewhat unified between windows and linux
* fix issue #27: better reporting on non-collectable items given on commandline
(e.g. pyc files)
* "Test" prefixed classes with an __init__ method are *not* collected by default anymore
* "Test" prefixed classes are *not* collected by default anymore if they
have an __init__ method
* terser reporting of collection error tracebacks
* renaming of arguments to some special rather internal hooks
* streamlined internal plugin arch code, renamed of internal methods
and argnames (related to py/_com.py multicall/plugin)
Changes between 1.0.0b9 and 1.0.0
=====================================

View File

@ -52,7 +52,7 @@ initpkg(__name__,
'_com.Registry' : ('./_com.py', 'Registry'),
'_com.MultiCall' : ('./_com.py', 'MultiCall'),
'_com.comregistry' : ('./_com.py', 'comregistry'),
'_com.Hooks' : ('./_com.py', 'Hooks'),
'_com.HookRelay' : ('./_com.py', 'HookRelay'),
# py lib cmdline tools
'cmdline.pytest' : ('./cmdline/pytest.py', 'main',),

View File

@ -5,77 +5,50 @@ py lib plugins and plugin call management
import py
class MultiCall:
""" Manage a specific call into many python functions/methods.
""" execute a call into multiple python functions/methods. """
Simple example:
MultiCall([list1.append, list2.append], 42).execute()
"""
def __init__(self, methods, *args, **kwargs):
def __init__(self, methods, kwargs, firstresult=False):
self.methods = methods[:]
self.args = args
self.kwargs = kwargs
self.results = []
self.firstresult = firstresult
def __repr__(self):
args = []
if self.args:
args.append("posargs=%r" %(self.args,))
kw = self.kwargs
args.append(", ".join(["%s=%r" % x for x in self.kwargs.items()]))
args = " ".join(args)
status = "results: %r, rmethods: %r" % (self.results, self.methods)
return "<MultiCall %s %s>" %(args, status)
status = "%d results, %d meths" % (len(self.results), len(self.methods))
return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
def execute(self, firstresult=False):
def execute(self):
while self.methods:
currentmethod = self.methods.pop()
res = self.execute_method(currentmethod)
if hasattr(self, '_ex1'):
self.results = [res]
break
method = self.methods.pop()
res = self._call1(method)
if res is not None:
self.results.append(res)
if firstresult:
break
if not firstresult:
if self.firstresult:
break
if not self.firstresult:
return self.results
if self.results:
return self.results[-1]
return self.results[-1]
def execute_method(self, currentmethod):
self.currentmethod = currentmethod
# provide call introspection if "__call__" is the first positional argument
if hasattr(currentmethod, 'im_self'):
varnames = currentmethod.im_func.func_code.co_varnames
needscall = varnames[1:2] == ('__call__',)
else:
try:
varnames = currentmethod.func_code.co_varnames
except AttributeError:
# builtin function
varnames = ()
needscall = varnames[:1] == ('__call__',)
if needscall:
return currentmethod(self, *self.args, **self.kwargs)
else:
#try:
return currentmethod(*self.args, **self.kwargs)
#except TypeError:
# print currentmethod.__module__, currentmethod.__name__, self.args, self.kwargs
# raise
def exclude_other_results(self):
self._ex1 = True
def _call1(self, method):
kwargs = self.kwargs
if '__call__' in varnames(method):
kwargs = kwargs.copy()
kwargs['__call__'] = self
return method(**kwargs)
def varnames(rawcode):
rawcode = getattr(rawcode, 'im_func', rawcode)
rawcode = getattr(rawcode, 'func_code', rawcode)
try:
return rawcode.co_varnames
except AttributeError:
return ()
class Registry:
"""
Manage Plugins: Load plugins and manage calls to plugins.
Manage Plugins: register/unregister call calls to plugins.
"""
logfile = None
MultiCall = MultiCall
def __init__(self, plugins=None):
if plugins is None:
plugins = []
@ -83,6 +56,7 @@ class Registry:
def register(self, plugin):
assert not isinstance(plugin, str)
assert not plugin in self._plugins
self._plugins.append(plugin)
def unregister(self, plugin):
@ -107,45 +81,44 @@ class Registry:
l.reverse()
return l
class Hooks:
def __init__(self, hookspecs, registry=None):
class HookRelay:
def __init__(self, hookspecs, registry):
self._hookspecs = hookspecs
if registry is None:
registry = py._com.comregistry
self.registry = registry
self._registry = registry
for name, method in vars(hookspecs).items():
if name[:1] != "_":
firstresult = getattr(method, 'firstresult', False)
mm = HookCall(registry, name, firstresult=firstresult)
setattr(self, name, mm)
def __repr__(self):
return "<Hooks %r %r>" %(self._hookspecs, self.registry)
setattr(self, name, self._makecall(name))
class HookCall:
def __init__(self, registry, name, firstresult, extralookup=None):
self.registry = registry
def _makecall(self, name, extralookup=None):
hookspecmethod = getattr(self._hookspecs, name)
firstresult = getattr(hookspecmethod, 'firstresult', False)
return HookCaller(self, name, firstresult=firstresult,
extralookup=extralookup)
def _getmethods(self, name, extralookup=()):
return self._registry.listattr(name, extra=extralookup)
def _performcall(self, name, multicall):
return multicall.execute()
def __repr__(self):
return "<HookRelay %r %r>" %(self._hookspecs, self._registry)
class HookCaller:
def __init__(self, hookrelay, name, firstresult, extralookup=()):
self.hookrelay = hookrelay
self.name = name
self.firstresult = firstresult
self.extralookup = extralookup and [extralookup] or ()
def clone(self, extralookup):
return HookCall(self.registry, self.name, self.firstresult, extralookup)
def __repr__(self):
mode = self.firstresult and "firstresult" or "each"
return "<HookCall %r mode=%s %s>" %(self.name, mode, self.registry)
return "<HookCaller %r firstresult=%s %s>" %(
self.name, self.firstresult, self.hookrelay)
def __call__(self, *args, **kwargs):
if args:
raise TypeError("only keyword arguments allowed "
"for api call to %r" % self.name)
attr = self.registry.listattr(self.name, extra=self.extralookup)
mc = MultiCall(attr, **kwargs)
# XXX this should be doable from a hook impl:
if self.registry.logfile:
self.registry.logfile.write("%s(**%s) # firstresult=%s\n" %
(self.name, kwargs, self.firstresult))
self.registry.logfile.flush()
return mc.execute(firstresult=self.firstresult)
comregistry = Registry()
def __call__(self, **kwargs):
methods = self.hookrelay._getmethods(self.name,
extralookup=self.extralookup)
mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
return self.hookrelay._performcall(self.name, mc)
comregistry = Registry([])

View File

@ -88,8 +88,8 @@ class Gateway(object):
self._channelfactory = ChannelFactory(self, _startcount)
self._cleanup.register(self)
if _startcount == 1: # only import 'py' on the "client" side
from py._com import Hooks
self.hook = Hooks(ExecnetAPI)
import py
self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry)
else:
self.hook = ExecnetAPI()

View File

@ -21,7 +21,8 @@ class GatewayManager:
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.specs.append(spec)
self.hook = py._com.Hooks(py.execnet._HookSpecs)
self.hook = py._com.HookRelay(
py.execnet._HookSpecs, py._com.comregistry)
def makegateways(self):
assert not self.gateways

View File

@ -1,15 +1,12 @@
import py
import os
from py._com import Registry, MultiCall
from py._com import Hooks
pytest_plugins = "xfail"
from py._com import Registry, MultiCall, HookRelay
class TestMultiCall:
def test_uses_copy_of_methods(self):
l = [lambda: 42]
mc = MultiCall(l)
mc = MultiCall(l, {})
repr(mc)
l[:] = []
res = mc.execute()
@ -18,22 +15,19 @@ class TestMultiCall:
def test_call_passing(self):
class P1:
def m(self, __call__, x):
assert __call__.currentmethod == self.m
assert len(__call__.results) == 1
assert not __call__.methods
return 17
class P2:
def m(self, __call__, x):
assert __call__.currentmethod == self.m
assert __call__.args
assert __call__.results == []
assert __call__.methods
return 23
p1 = P1()
p2 = P2()
multicall = MultiCall([p1.m, p2.m], 23)
multicall = MultiCall([p1.m, p2.m], {'x': 23})
assert "23" in repr(multicall)
reslist = multicall.execute()
assert len(reslist) == 2
@ -43,62 +37,44 @@ class TestMultiCall:
def test_keyword_args(self):
def f(x):
return x + 1
multicall = MultiCall([f], x=23)
assert "x=23" in repr(multicall)
multicall = MultiCall([f], dict(x=23))
assert "'x': 23" in repr(multicall)
reslist = multicall.execute()
assert reslist == [24]
assert "24" in repr(multicall)
assert "1 results" in repr(multicall)
def test_optionalcallarg(self):
class P1:
def m(self, x):
return x
call = MultiCall([P1().m], 23)
call = MultiCall([P1().m], dict(x=23))
assert "23" in repr(call)
assert call.execute() == [23]
assert call.execute(firstresult=True) == 23
call = MultiCall([P1().m], dict(x=23), firstresult=True)
def test_call_subexecute(self):
def m(__call__):
subresult = __call__.execute(firstresult=True)
subresult = __call__.execute()
return subresult + 1
def n():
return 1
call = MultiCall([n, m])
res = call.execute(firstresult=True)
assert res == 2
def test_call_exclude_other_results(self):
def m(__call__):
__call__.exclude_other_results()
return 10
def n():
return 1
call = MultiCall([n, n, m, n])
call = MultiCall([n, m], {}, firstresult=True)
res = call.execute()
assert res == [10]
# doesn't really make sense for firstresult-mode - because
# we might not have had a chance to run at all.
#res = call.execute(firstresult=True)
#assert res == 10
assert res == 2
def test_call_none_is_no_result(self):
def m1():
return 1
def m2():
return None
mc = MultiCall([m1, m2])
res = mc.execute(firstresult=True)
res = MultiCall([m1, m2], {}, firstresult=True).execute()
assert res == 1
res = MultiCall([m1, m2], {}).execute()
assert res == [1]
class TestRegistry:
def test_MultiCall(self):
plugins = Registry()
assert hasattr(plugins, "MultiCall")
def test_register(self):
registry = Registry()
@ -142,14 +118,14 @@ class TestRegistry:
def test_api_and_defaults():
assert isinstance(py._com.comregistry, Registry)
class TestHooks:
class TestHookRelay:
def test_happypath(self):
registry = Registry()
class Api:
def hello(self, arg):
pass
mcm = Hooks(hookspecs=Api, registry=registry)
mcm = HookRelay(hookspecs=Api, registry=registry)
assert hasattr(mcm, 'hello')
assert repr(mcm.hello).find("hello") != -1
class Plugin:
@ -160,23 +136,21 @@ class TestHooks:
assert l == [4]
assert not hasattr(mcm, 'world')
def test_needskeywordargs(self):
def test_only_kwargs(self):
registry = Registry()
class Api:
def hello(self, arg):
pass
mcm = Hooks(hookspecs=Api, registry=registry)
excinfo = py.test.raises(TypeError, "mcm.hello(3)")
assert str(excinfo.value).find("only keyword arguments") != -1
assert str(excinfo.value).find("hello(self, arg)")
mcm = HookRelay(hookspecs=Api, registry=registry)
py.test.raises(TypeError, "mcm.hello(3)")
def test_firstresult(self):
def test_firstresult_definition(self):
registry = Registry()
class Api:
def hello(self, arg): pass
hello.firstresult = True
mcm = Hooks(hookspecs=Api, registry=registry)
mcm = HookRelay(hookspecs=Api, registry=registry)
class Plugin:
def hello(self, arg):
return arg + 1
@ -186,15 +160,16 @@ class TestHooks:
def test_default_plugins(self):
class Api: pass
mcm = Hooks(hookspecs=Api)
assert mcm.registry == py._com.comregistry
mcm = HookRelay(hookspecs=Api, registry=py._com.comregistry)
assert mcm._registry == py._com.comregistry
def test_hooks_extra_plugins(self):
registry = Registry()
class Api:
def hello(self, arg):
pass
hook_hello = Hooks(hookspecs=Api, registry=registry).hello
hookrelay = HookRelay(hookspecs=Api, registry=registry)
hook_hello = hookrelay.hello
class Plugin:
def hello(self, arg):
return arg + 1
@ -202,7 +177,7 @@ class TestHooks:
class Plugin2:
def hello(self, arg):
return arg + 2
newhook = hook_hello.clone(extralookup=Plugin2())
newhook = hookrelay._makecall("hello", extralookup=Plugin2())
l = newhook(arg=3)
assert l == [5, 4]
l2 = hook_hello(arg=3)

View File

@ -47,7 +47,7 @@ class HookRecorder:
recorder = RecordCalls()
self._recorders[hookspecs] = recorder
self._comregistry.register(recorder)
self.hook = py._com.Hooks(hookspecs, registry=self._comregistry)
self.hook = py._com.HookRelay(hookspecs, registry=self._comregistry)
def finish_recording(self):
for recorder in self._recorders.values():

View File

@ -185,7 +185,7 @@ class CaptureManager:
method = self._getmethod(collector.config, collector.fspath)
self.resumecapture(method)
try:
rep = __call__.execute(firstresult=True)
rep = __call__.execute()
finally:
outerr = self.suspendcapture()
addouterr(rep, outerr)
@ -208,7 +208,7 @@ class CaptureManager:
method = self._getmethod(session.config, None)
self.resumecapture(method)
try:
rep = __call__.execute(firstresult=True)
rep = __call__.execute()
finally:
outerr = self.suspendcapture()
if rep:
@ -221,7 +221,7 @@ class CaptureManager:
def pytest_runtest_makereport(self, __call__, item, call):
self.deactivate_funcargs()
rep = __call__.execute(firstresult=True)
rep = __call__.execute()
outerr = self.suspendcapture()
outerr = (item.outerr[0] + outerr[0], item.outerr[1] + outerr[1])
if not rep.passed:

View File

@ -3,7 +3,7 @@
import py
def pytest_pyfunc_call(__call__, pyfuncitem):
if not __call__.execute(firstresult=True):
if not __call__.execute():
testfunction = pyfuncitem.obj
if pyfuncitem._isyieldedfunction():
testfunction(*pyfuncitem._args)

View File

@ -35,7 +35,7 @@ class Execnetcleanup:
def pytest_pyfunc_call(self, __call__, pyfuncitem):
if self._gateways is not None:
gateways = self._gateways[:]
res = __call__.execute(firstresult=True)
res = __call__.execute()
while len(self._gateways) > len(gateways):
self._gateways[-1].exit()
return res

View File

@ -8,14 +8,27 @@ def pytest_addoption(parser):
def pytest_configure(config):
hooklog = config.getvalue("hooklog")
if hooklog:
assert not config.pluginmanager.comregistry.logfile
config.pluginmanager.comregistry.logfile = open(hooklog, 'w')
config._hooklogfile = open(hooklog, 'w', 0)
config._hooklog_oldperformcall = config.hook._performcall
config.hook._performcall = (lambda name, multicall:
logged_call(name=name, multicall=multicall, config=config))
def logged_call(name, multicall, config):
f = config._hooklogfile
f.write("%s(**%s)\n" % (name, multicall.kwargs))
try:
res = config._hooklog_oldperformcall(name=name, multicall=multicall)
except:
f.write("-> exception")
raise
f.write("-> %r" % (res,))
return res
def pytest_unconfigure(config):
f = config.pluginmanager.comregistry.logfile
if f:
f.close()
config.pluginmanager.comregistry.logfile = None
try:
del config.hook.__dict__['_performcall']
except KeyError:
pass
# ===============================================================================
# plugin tests

View File

@ -24,7 +24,7 @@ def pytest_runtest_makereport(__call__, item, call):
return
if hasattr(item, 'obj') and hasattr(item.obj, 'func_dict'):
if 'xfail' in item.obj.func_dict:
res = __call__.execute(firstresult=True)
res = __call__.execute()
if call.excinfo:
res.skipped = True
res.failed = res.passed = False

View File

@ -16,10 +16,9 @@ class PluginManager(object):
if comregistry is None:
comregistry = py._com.Registry()
self.comregistry = comregistry
self.MultiCall = self.comregistry.MultiCall
self.impname2plugin = {}
self.hook = py._com.Hooks(
self.hook = py._com.HookRelay(
hookspecs=hookspec,
registry=self.comregistry)
@ -166,20 +165,24 @@ class PluginManager(object):
return self.hook.pytest_internalerror(excrepr=excrepr)
def do_addoption(self, parser):
methods = self.comregistry.listattr("pytest_addoption", reverse=True)
mc = py._com.MultiCall(methods, parser=parser)
mname = "pytest_addoption"
methods = self.comregistry.listattr(mname, reverse=True)
mc = py._com.MultiCall(methods, {'parser': parser})
mc.execute()
def pytest_plugin_registered(self, plugin):
if hasattr(self, '_config'):
self.call_plugin(plugin, "pytest_addoption", parser=self._config._parser)
self.call_plugin(plugin, "pytest_configure", config=self._config)
self.call_plugin(plugin, "pytest_addoption",
{'parser': self._config._parser})
self.call_plugin(plugin, "pytest_configure",
{'config': self._config})
#dic = self.call_plugin(plugin, "pytest_namespace")
#self._updateext(dic)
def call_plugin(self, plugin, methname, **kwargs):
return self.MultiCall(self.listattr(methname, plugins=[plugin]),
**kwargs).execute(firstresult=True)
def call_plugin(self, plugin, methname, kwargs):
return py._com.MultiCall(
methods=self.listattr(methname, plugins=[plugin]),
kwargs=kwargs, firstresult=True).execute()
def _updateext(self, dic):
if dic:

View File

@ -155,8 +155,8 @@ class PyCollectorMixin(PyobjMixin, py.test.collect.Collector):
cls = clscol and clscol.obj or None
metafunc = funcargs.Metafunc(funcobj, config=self.config,
cls=cls, module=module)
gentesthook = self.config.hook.pytest_generate_tests.clone(
extralookup=module)
gentesthook = self.config.hook._makecall(
"pytest_generate_tests", extralookup=module)
gentesthook(metafunc=metafunc)
if not metafunc._calls:
return self.Function(name, parent=self)

View File

@ -145,7 +145,7 @@ class TestCollectFS:
names = [x.name for x in col.collect()]
assert names == ["dir1", "dir2", "test_one.py", "test_two.py", "x"]
class TestCollectPluginHooks:
class TestCollectPluginHookRelay:
def test_pytest_collect_file(self, testdir):
tmpdir = testdir.tmpdir
wascalled = []

View File

@ -222,10 +222,6 @@ class TestPytestPluginInteractions:
config.pluginmanager.register(A())
assert len(l) == 2
def test_MultiCall(self):
pp = PluginManager()
assert hasattr(pp, 'MultiCall')
# lower level API
def test_listattr(self):