test_ok2/py/_com.py

157 lines
4.9 KiB
Python

"""
py lib plugins and events.
you can write plugins that extend the py lib API.
currently this is mostly used by py.test
registering a plugin
++++++++++++++++++++++++++++++++++
::
>>> class MyPlugin:
... def pyevent_plugin_registered(self, plugin):
... print "registering", plugin.__class__.__name__
...
>>> import py
>>> py._com.pyplugins.register(MyPlugin())
registering MyPlugin
"""
import py
class MultiCall:
""" Manage a specific call into many python functions/methods.
Simple example:
MultiCall([list1.append, list2.append], 42).execute()
"""
NONEASRESULT = object()
def __init__(self, methods, *args, **kwargs):
self.methods = methods
self.args = args
self.kwargs = kwargs
self.results = []
def execute(self, firstresult=False):
while self.methods:
self.currentmethod = self.methods.pop()
# provide call introspection if "__call__" is the first positional argument
if hasattr(self.currentmethod, 'im_self'):
varnames = self.currentmethod.im_func.func_code.co_varnames
needscall = varnames[1:2] == ('__call__',)
else:
try:
varnames = self.currentmethod.func_code.co_varnames
except AttributeError:
# builtin function
varnames = ()
needscall = varnames[:1] == ('__call__',)
if needscall:
res = self.currentmethod(self, *self.args, **self.kwargs)
else:
res = self.currentmethod(*self.args, **self.kwargs)
if hasattr(self, '_ex1'):
self.results = [res]
break
if res is not None:
if res is self.NONEASRESULT:
res = None
self.results.append(res)
if firstresult:
break
if not firstresult:
return self.results
if self.results:
return self.results[-1]
def exclude_other_results(self):
self._ex1 = True
class PyPlugins:
"""
Manage Plugins: Load plugins and manage calls to plugins.
"""
MultiCall = MultiCall
def __init__(self, plugins=None):
if plugins is None:
plugins = []
self._plugins = plugins
self._callbacks = []
def import_module(self, modspec):
# XXX allow modspec to specify version / lookup
modpath = modspec
self.notify("importingmodule", modpath)
__import__(modpath)
def consider_env(self):
""" consider ENV variable for loading modules. """
for spec in self._envlist("PYLIB"):
self.import_module(spec)
def _envlist(self, varname):
val = py.std.os.environ.get(varname, None)
if val is not None:
return val.split(',')
return ()
def consider_module(self, mod, varname="pylib"):
speclist = getattr(mod, varname, ())
if not isinstance(speclist, (list, tuple)):
speclist = (speclist,)
for spec in speclist:
self.import_module(spec)
def register(self, plugin):
assert not isinstance(plugin, str)
self._plugins.append(plugin)
self.notify("plugin_registered", plugin)
def unregister(self, plugin):
self.notify("plugin_unregistered", plugin)
self._plugins.remove(plugin)
def getplugins(self):
return list(self._plugins)
def isregistered(self, plugin):
return plugin in self._plugins
def listattr(self, attrname, plugins=None, extra=()):
l = []
if plugins is None:
plugins = self._plugins
if extra:
plugins += list(extra)
for plugin in plugins:
try:
l.append(getattr(plugin, attrname))
except AttributeError:
continue
return l
def call_each(self, methname, *args, **kwargs):
""" return call object for executing a plugin call. """
return MultiCall(self.listattr(methname), *args, **kwargs).execute()
def call_firstresult(self, methname, *args, **kwargs):
""" return first non-None result of a plugin method. """
return MultiCall(self.listattr(methname), *args, **kwargs).execute(firstresult=True)
def call_plugin(self, plugin, methname, *args, **kwargs):
return MultiCall(self.listattr(methname, plugins=[plugin]),
*args, **kwargs).execute(firstresult=True)
def notify(self, eventname, *args, **kwargs):
#print "notifying", eventname, args, kwargs
MultiCall(self.listattr("pyevent_" + eventname),
*args, **kwargs).execute()
#print "calling anonymous hooks", args, kwargs
MultiCall(self.listattr("pyevent"),
eventname, *args, **kwargs).execute()
pyplugins = PyPlugins()