[svn r63845] introduce new _pytest plugin that allows to selectively record
plugin calls and do assertions about them. --HG-- branch : trunk
This commit is contained in:
parent
0300b2109c
commit
b6b060c1d0
20
py/_com.py
20
py/_com.py
|
@ -160,26 +160,32 @@ class PyPlugins:
|
|||
|
||||
|
||||
class PluginAPI:
|
||||
def __init__(self, apiclass, plugins):
|
||||
def __init__(self, apiclass, plugins=None):
|
||||
self._apiclass = apiclass
|
||||
if plugins is None:
|
||||
plugins = pyplugins
|
||||
self._plugins = plugins
|
||||
for name in vars(apiclass):
|
||||
for name, method in vars(apiclass).items():
|
||||
if name[:2] != "__":
|
||||
mm = CallMaker(plugins, name)
|
||||
firstresult = getattr(method, 'firstresult', False)
|
||||
mm = ApiCall(plugins, name, firstresult=firstresult)
|
||||
setattr(self, name, mm)
|
||||
def __repr__(self):
|
||||
return "<PluginAPI %r %r>" %(self._apiclass, self._plugins)
|
||||
|
||||
class CallMaker:
|
||||
def __init__(self, plugins, name):
|
||||
class ApiCall:
|
||||
def __init__(self, plugins, name, firstresult):
|
||||
self.plugins = plugins
|
||||
self.name = name
|
||||
self.firstresult = firstresult
|
||||
|
||||
def __repr__(self):
|
||||
return "<MulticallMaker %r %s>" %(self.name, self.plugins)
|
||||
mode = self.firstresult and "firstresult" or "each"
|
||||
return "<ApiCall %r mode=%s %s>" %(self.name, mode, self.plugins)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
mc = MultiCall(self.plugins.listattr(self.name), *args, **kwargs)
|
||||
return mc.execute()
|
||||
#print "making multicall", self
|
||||
return mc.execute(firstresult=self.firstresult)
|
||||
|
||||
pyplugins = PyPlugins()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pytest_plugins = 'pytest_doctest', 'pytest_pytester' # , 'pytest_restdoc'
|
||||
pytest_plugins = '_pytest doctest pytester'.split()
|
||||
|
||||
rsyncdirs = ['../doc']
|
||||
rsyncignore = ['c-extension/greenlet/build']
|
||||
|
||||
|
|
|
@ -52,6 +52,12 @@ class GatewayCleanup:
|
|||
gw.exit()
|
||||
#gw.join() # should work as well
|
||||
|
||||
class ExecnetAPI:
|
||||
def pyexecnet_gateway_init(self, gateway):
|
||||
""" signal initialisation of new gateway. """
|
||||
def pyexecnet_gateway_exit(self, gateway):
|
||||
""" signal exitting of gateway. """
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# Base Gateway (used for both remote and local side)
|
||||
# ----------------------------------------------------------
|
||||
|
@ -70,6 +76,12 @@ class Gateway(object):
|
|||
self._io = io
|
||||
self._channelfactory = ChannelFactory(self, _startcount)
|
||||
self._cleanup.register(self)
|
||||
try:
|
||||
from py._com import PluginAPI
|
||||
except ImportError:
|
||||
self.api = ExecnetAPI()
|
||||
else:
|
||||
self.api = PluginAPI(ExecnetAPI)
|
||||
|
||||
def _initreceive(self, requestqueue=False):
|
||||
if requestqueue:
|
||||
|
@ -331,12 +343,7 @@ class Gateway(object):
|
|||
self._cleanup.unregister(self)
|
||||
self._stopexec()
|
||||
self._stopsend()
|
||||
try:
|
||||
py._com.pyplugins.notify("gateway_exit", self)
|
||||
except NameError:
|
||||
# XXX on the remote side 'py' is not imported
|
||||
# and so we can't notify
|
||||
pass
|
||||
self.api.pyexecnet_gateway_exit(gateway=self)
|
||||
|
||||
def _remote_redirect(self, stdout=None, stderr=None):
|
||||
""" return a handle representing a redirection of a remote
|
||||
|
|
|
@ -41,7 +41,7 @@ class InstallableGateway(gateway.Gateway):
|
|||
super(InstallableGateway, self).__init__(io=io, _startcount=1)
|
||||
# XXX we dissallow execution form the other side
|
||||
self._initreceive(requestqueue=False)
|
||||
py._com.pyplugins.notify("gateway_init", self)
|
||||
self.api.pyexecnet_gateway_init(gateway=self)
|
||||
|
||||
def _remote_bootstrap_gateway(self, io, extra=''):
|
||||
""" return Gateway with a asynchronously remotely
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
import py
|
||||
pytest_plugins = "pytester"
|
||||
from py.__.execnet.gateway import ExecnetAPI
|
||||
|
||||
class TestExecnetEvents:
|
||||
def test_popengateway(self, eventrecorder):
|
||||
def test_popengateway_events(self, _pytest):
|
||||
rec = _pytest.getcallrecorder(ExecnetAPI)
|
||||
gw = py.execnet.PopenGateway()
|
||||
event = eventrecorder.popevent("gateway_init")
|
||||
assert event.args[0] == gw
|
||||
call = rec.popcall("pyexecnet_gateway_init")
|
||||
assert call.gateway == gw
|
||||
gw.exit()
|
||||
event = eventrecorder.popevent("gateway_exit")
|
||||
assert event.args[0] == gw
|
||||
call = rec.popcall("pyexecnet_gateway_exit")
|
||||
assert call.gateway == gw
|
||||
|
|
|
@ -250,7 +250,7 @@ class TestPyPluginsEvents:
|
|||
assert l == [(13, ), {'x':15}]
|
||||
|
||||
|
||||
class TestMulticallMaker:
|
||||
class TestPluginAPI:
|
||||
def test_happypath(self):
|
||||
plugins = PyPlugins()
|
||||
class Api:
|
||||
|
@ -267,3 +267,22 @@ class TestMulticallMaker:
|
|||
l = mcm.hello(3)
|
||||
assert l == [4]
|
||||
assert not hasattr(mcm, 'world')
|
||||
|
||||
def test_firstresult(self):
|
||||
plugins = PyPlugins()
|
||||
class Api:
|
||||
def hello(self, arg): pass
|
||||
hello.firstresult = True
|
||||
|
||||
mcm = PluginAPI(apiclass=Api, plugins=plugins)
|
||||
class Plugin:
|
||||
def hello(self, arg):
|
||||
return arg + 1
|
||||
plugins.register(Plugin())
|
||||
res = mcm.hello(3)
|
||||
assert res == 4
|
||||
|
||||
def test_default_plugins(self):
|
||||
class Api: pass
|
||||
mcm = PluginAPI(apiclass=Api)
|
||||
assert mcm._plugins == py._com.pyplugins
|
||||
|
|
|
@ -48,7 +48,7 @@ class PluginHooks:
|
|||
# runtest related hooks
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
def pytest_pyfunc_call(self, pyfuncitem, args, kwargs):
|
||||
def pytest_pyfunc_call(self, call, pyfuncitem, args, kwargs):
|
||||
""" return True if we consumed/did the call to the python function item. """
|
||||
|
||||
def pytest_item_makereport(self, item, excinfo, when, outerr):
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
import py
|
||||
|
||||
class _pytestPlugin:
|
||||
def pytest_funcarg___pytest(self, pyfuncitem):
|
||||
return PytestArg(pyfuncitem)
|
||||
|
||||
class PytestArg:
|
||||
def __init__(self, pyfuncitem):
|
||||
self.pyfuncitem = pyfuncitem
|
||||
|
||||
def getcallrecorder(self, apiclass, pyplugins=None):
|
||||
if pyplugins is None:
|
||||
pyplugins = self.pyfuncitem.config.pytestplugins.pyplugins
|
||||
callrecorder = CallRecorder(pyplugins)
|
||||
callrecorder.start_recording(apiclass)
|
||||
self.pyfuncitem.addfinalizer(callrecorder.finalize)
|
||||
return callrecorder
|
||||
|
||||
|
||||
class ParsedCall:
|
||||
def __init__(self, name, locals):
|
||||
assert '_name' not in locals
|
||||
self.__dict__.update(locals)
|
||||
self._name = name
|
||||
|
||||
def __repr__(self):
|
||||
return "<ParsedCall %r>" %(self.__dict__,)
|
||||
|
||||
class CallRecorder:
|
||||
def __init__(self, pyplugins):
|
||||
self._pyplugins = pyplugins
|
||||
self.calls = []
|
||||
self._recorders = {}
|
||||
|
||||
def start_recording(self, apiclass):
|
||||
assert apiclass not in self._recorders
|
||||
class RecordCalls:
|
||||
_recorder = self
|
||||
for name, method in vars(apiclass).items():
|
||||
if name[0] != "_":
|
||||
setattr(RecordCalls, name, self._getcallparser(method))
|
||||
recorder = RecordCalls()
|
||||
self._recorders[apiclass] = recorder
|
||||
self._pyplugins.register(recorder)
|
||||
|
||||
def finalize(self):
|
||||
for recorder in self._recorders.values():
|
||||
self._pyplugins.unregister(recorder)
|
||||
|
||||
def _getcallparser(self, method):
|
||||
name = method.__name__
|
||||
args, varargs, varkw, default = py.std.inspect.getargspec(method)
|
||||
assert args[0] == "self"
|
||||
fspec = py.std.inspect.formatargspec(args, varargs, varkw, default)
|
||||
# we use exec because we want to have early type
|
||||
# errors on wrong input arguments, using
|
||||
# *args/**kwargs delays this and gives errors
|
||||
# elsewhere
|
||||
exec py.code.compile("""
|
||||
def %(name)s%(fspec)s:
|
||||
self._recorder.calls.append(
|
||||
ParsedCall(%(name)r, locals()))
|
||||
""" % locals())
|
||||
return locals()[name]
|
||||
|
||||
def popcall(self, name):
|
||||
for i, call in py.builtin.enumerate(self.calls):
|
||||
if call._name == name:
|
||||
del self.calls[i]
|
||||
return call
|
||||
raise ValueError("could not find call %r in %r" %(name, self.calls))
|
||||
|
||||
def test_generic(plugintester):
|
||||
plugintester.apicheck(_pytestPlugin)
|
||||
|
||||
def test_callrecorder_basic():
|
||||
pyplugins = py._com.PyPlugins()
|
||||
rec = CallRecorder(pyplugins)
|
||||
class ApiClass:
|
||||
def xyz(self, arg):
|
||||
pass
|
||||
rec.start_recording(ApiClass)
|
||||
pyplugins.call_each("xyz", 123)
|
||||
call = rec.popcall("xyz")
|
||||
assert call.arg == 123
|
||||
assert call._name == "xyz"
|
||||
py.test.raises(ValueError, "rec.popcall('abc')")
|
||||
|
||||
def test_functional(testdir, linecomp):
|
||||
sorter = testdir.inline_runsource("""
|
||||
import py
|
||||
pytest_plugins="_pytest"
|
||||
def test_func(_pytest):
|
||||
class ApiClass:
|
||||
def xyz(self, arg): pass
|
||||
rec = _pytest.getcallrecorder(ApiClass)
|
||||
class Plugin:
|
||||
def xyz(self, arg):
|
||||
return arg + 1
|
||||
rec._pyplugins.register(Plugin())
|
||||
res = rec._pyplugins.call_firstresult("xyz", 41)
|
||||
assert res == 42
|
||||
""")
|
||||
sorter.assertoutcome(passed=1)
|
Loading…
Reference in New Issue