[svn r63908] first step in only allowing keyword arguments to plugin calls
--HG-- branch : trunk
This commit is contained in:
parent
f8fc229917
commit
fb7ff9a8c2
|
@ -139,8 +139,10 @@ class ApiCall:
|
|||
return "<ApiCall %r mode=%s %s>" %(self.name, mode, self.registry)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
mc = MultiCall(self.registry.listattr(self.name), *args, **kwargs)
|
||||
#print "making multicall", self
|
||||
if args:
|
||||
raise TypeError("only keyword arguments allowed "
|
||||
"for api call to %r" % self.name)
|
||||
mc = MultiCall(self.registry.listattr(self.name), **kwargs)
|
||||
return mc.execute(firstresult=self.firstresult)
|
||||
|
||||
comregistry = Registry()
|
||||
|
|
|
@ -177,10 +177,20 @@ class TestPluginAPI:
|
|||
def hello(self, arg):
|
||||
return arg + 1
|
||||
registry.register(Plugin())
|
||||
l = mcm.hello(3)
|
||||
l = mcm.hello(arg=3)
|
||||
assert l == [4]
|
||||
assert not hasattr(mcm, 'world')
|
||||
|
||||
def test_needskeywordargs(self):
|
||||
registry = Registry()
|
||||
class Api:
|
||||
def hello(self, arg):
|
||||
pass
|
||||
mcm = PluginAPI(apiclass=Api, registry=registry)
|
||||
excinfo = py.test.raises(TypeError, "mcm.hello(3)")
|
||||
assert str(excinfo.value).find("only keyword arguments") != -1
|
||||
assert str(excinfo.value).find("hello(self, arg)")
|
||||
|
||||
def test_firstresult(self):
|
||||
registry = Registry()
|
||||
class Api:
|
||||
|
@ -192,7 +202,7 @@ class TestPluginAPI:
|
|||
def hello(self, arg):
|
||||
return arg + 1
|
||||
registry.register(Plugin())
|
||||
res = mcm.hello(3)
|
||||
res = mcm.hello(arg=3)
|
||||
assert res == 4
|
||||
|
||||
def test_default_plugins(self):
|
||||
|
|
|
@ -98,7 +98,8 @@ class DSession(Session):
|
|||
callname, args, kwargs = eventcall
|
||||
if callname is not None:
|
||||
call = getattr(self.config.api, callname)
|
||||
call(*args, **kwargs)
|
||||
assert not args
|
||||
call(**kwargs)
|
||||
|
||||
# termination conditions
|
||||
if ((loopstate.testsfailed and self.config.option.exitfirst) or
|
||||
|
@ -176,15 +177,15 @@ class DSession(Session):
|
|||
senditems.append(next)
|
||||
else:
|
||||
self.config.api.pytest_collectstart(collector=next)
|
||||
self.queueevent("pytest_collectreport", basic_collect_report(next))
|
||||
self.queueevent("pytest_collectreport", rep=basic_collect_report(next))
|
||||
if self.config.option.dist == "each":
|
||||
self.senditems_each(senditems)
|
||||
else:
|
||||
# XXX assert self.config.option.dist == "load"
|
||||
self.senditems_load(senditems)
|
||||
|
||||
def queueevent(self, eventname, *args, **kwargs):
|
||||
self.queue.put((eventname, args, kwargs))
|
||||
def queueevent(self, eventname, **kwargs):
|
||||
self.queue.put((eventname, (), kwargs))
|
||||
|
||||
def senditems_each(self, tosend):
|
||||
if not tosend:
|
||||
|
|
|
@ -81,7 +81,7 @@ class TestDSession:
|
|||
session.triggertesting([modcol])
|
||||
name, args, kwargs = session.queue.get(block=False)
|
||||
assert name == 'pytest_collectreport'
|
||||
rep, = args
|
||||
rep = kwargs['rep']
|
||||
assert len(rep.result) == 1
|
||||
|
||||
def test_triggertesting_item(self, testdir):
|
||||
|
@ -134,7 +134,7 @@ class TestDSession:
|
|||
session.queueevent(None)
|
||||
session.loop_once(loopstate)
|
||||
assert node.sent == [[item]]
|
||||
session.queueevent("pytest_itemtestreport", run(item, node))
|
||||
session.queueevent("pytest_itemtestreport", rep=run(item, node))
|
||||
session.loop_once(loopstate)
|
||||
assert loopstate.shuttingdown
|
||||
assert not loopstate.testsfailed
|
||||
|
@ -147,7 +147,7 @@ class TestDSession:
|
|||
session.addnode(node)
|
||||
|
||||
# setup a HostDown event
|
||||
session.queueevent("pytest_testnodedown", node, None)
|
||||
session.queueevent("pytest_testnodedown", node=node, error=None)
|
||||
|
||||
loopstate = session._initloopstate([item])
|
||||
loopstate.dowork = False
|
||||
|
@ -173,7 +173,7 @@ class TestDSession:
|
|||
# have one test pending for a node that goes down
|
||||
session.senditems_load([item1, item2])
|
||||
node = session.item2nodes[item1] [0]
|
||||
session.queueevent("pytest_testnodedown", node, None)
|
||||
session.queueevent("pytest_testnodedown", node=node, error=None)
|
||||
evrec = testdir.geteventrecorder(session.pluginmanager)
|
||||
print session.item2nodes
|
||||
loopstate = session._initloopstate([])
|
||||
|
@ -191,7 +191,7 @@ class TestDSession:
|
|||
# setup a session with two nodes
|
||||
session = DSession(item.config)
|
||||
node1 = MockNode()
|
||||
session.queueevent("pytest_testnodeready", node1)
|
||||
session.queueevent("pytest_testnodeready", node=node1)
|
||||
loopstate = session._initloopstate([item])
|
||||
loopstate.dowork = False
|
||||
assert len(session.node2pending) == 0
|
||||
|
|
|
@ -22,6 +22,7 @@ class TXNode(object):
|
|||
self._down = False
|
||||
|
||||
def notify(self, eventname, *args, **kwargs):
|
||||
assert not args
|
||||
self.putevent((eventname, args, kwargs))
|
||||
|
||||
def callback(self, eventcall):
|
||||
|
|
|
@ -61,7 +61,7 @@ class TerminalReporter:
|
|||
self._tw.sep(sep, title, **markup)
|
||||
|
||||
def getcategoryletterword(self, rep):
|
||||
res = self.config.api.pytest_report_teststatus(rep)
|
||||
res = self.config.api.pytest_report_teststatus(rep=rep)
|
||||
if res:
|
||||
return res
|
||||
for cat in 'skipped failed passed ???'.split():
|
||||
|
|
Loading…
Reference in New Issue