""" web server for py.test """ from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler import thread, threading import re import time import random import Queue import os import sys import socket import py from py.__.test.rsession.rsession import RSession from py.__.test.rsession import repevent from py.__.test import collect from py.__.test.rsession.webdata import json DATADIR = py.path.local(__file__).dirpath("webdata") FUNCTION_LIST = ["main", "show_skip", "show_traceback", "show_info", "hide_info", "show_host", "hide_host", "hide_messagebox", "opt_scroll"] try: try: if not py.test.config.getvalue('_dist_import_pypy'): raise ImportError except AttributeError: pass from pypy.rpython.ootypesystem.bltregistry import MethodDesc, BasicExternal,\ described from pypy.translator.js.main import rpython2javascript from pypy.translator.js import commproxy from pypy.rpython.extfunc import _callable commproxy.USE_MOCHIKIT = False IMPORTED_PYPY = True except (ImportError, NameError): class BasicExternal(object): pass def described(*args, **kwargs): def decorator(func): return func return decorator def _callable(*args, **kwargs): pass IMPORTED_PYPY = False def add_item(event): """ A little helper """ item = event.item itemtype = item.__class__.__name__ itemname = item.name fullitemname = "/".join(item.listnames()) d = {'fullitemname': fullitemname, 'itemtype': itemtype, 'itemname': itemname} #if itemtype == 'Module': try: d['length'] = str(len(list(event.item._tryiter()))) except: d['length'] = "?" return d class MultiQueue(object): """ a tailor-made queue (internally using Queue) for py.test.rsession.web API-wise the main difference is that the get() method gets a sessid argument, which is used to determine what data to feed to the client when a data queue for a sessid doesn't yet exist, it is created, and filled with data that has already been fed to the other clients """ def __init__(self): self._cache = [] self._session_queues = {} self._lock = py.std.thread.allocate_lock() def put(self, item): self._lock.acquire() try: self._cache.append(item) for key, q in self._session_queues.items(): q.put(item) finally: self._lock.release() def _del(self, sessid): self._lock.acquire() try: del self._session_queues[sessid] finally: self._lock.release() def get(self, sessid): self._lock.acquire() try: if not sessid in self._session_queues: self._create_session_queue(sessid) finally: self._lock.release() return self._session_queues[sessid].get(sessid) def empty(self): self._lock.acquire() try: if not self._session_queues: return not len(self._cache) for q in self._session_queues.values(): if not q.empty(): return False finally: self._lock.release() return True def empty_queue(self, sessid): return self._session_queues[sessid].empty() def _create_session_queue(self, sessid): self._session_queues[sessid] = q = Queue.Queue() for item in self._cache: q.put(item) class ExportedMethods(BasicExternal): _render_xmlhttp = True def __init__(self): self.pending_events = MultiQueue() self.start_event = threading.Event() self.end_event = threading.Event() self.skip_reasons = {} self.fail_reasons = {} self.stdout = {} self.stderr = {} self.all = 0 self.to_rsync = {} def findmodule(self, item): # find the most outwards parent which is module current = item while current: if isinstance(current, collect.Module): break current = current.parent if current is not None: return str(current.name), str("/".join(current.listnames())) else: return str(item.parent.name), str("/".join(item.parent.listnames())) def show_hosts(self): self.start_event.wait() to_send = {} for host in self.hosts: to_send[host.hostid] = host.hostname return to_send show_hosts = described(retval={str:str}, args=[('callback', _callable([{str:str}]))])(show_hosts) def show_skip(self, item_name="aa"): return {'item_name': item_name, 'reason': self.skip_reasons[item_name]} show_skip = described(retval={str:str}, args=[('item_name',str),('callback', _callable([{str:str}]))])(show_skip) def show_fail(self, item_name="aa"): return {'item_name':item_name, 'traceback':str(self.fail_reasons[item_name]), 'stdout':self.stdout[item_name], 'stderr':self.stderr[item_name]} show_fail = described(retval={str:str}, args=[('item_name',str),('callback', _callable([{str:str}]))])(show_fail) _sessids = None _sesslock = py.std.thread.allocate_lock() def show_sessid(self): if not self._sessids: self._sessids = [] self._sesslock.acquire() try: while 1: chars = list(py.std.string.lowercase) py.std.random.shuffle(chars) sessid = ''.join(chars[:8]) if sessid not in self._sessids: self._sessids.append(sessid) break finally: self._sesslock.release() return sessid show_sessid = described(retval=str, args=[('callback', _callable([str]))])(show_sessid) def failed(self, **kwargs): if not 'sessid' in kwargs: return sessid = kwargs['sessid'] to_del = -1 for num, i in enumerate(self._sessids): if i == sessid: to_del = num if to_del != -1: del self._sessids[to_del] self.pending_events._del(kwargs['sessid']) def show_all_statuses(self, sessid=-1): retlist = [self.show_status_change(sessid)] while not self.pending_events.empty_queue(sessid): retlist.append(self.show_status_change(sessid)) retval = retlist return retval show_all_statuses = described(retval=[{str:str}],args= [('sessid',str), ('callback',_callable([[{str:str}]]))])(show_all_statuses) def show_status_change(self, sessid): event = self.pending_events.get(sessid) if event is None: self.end_event.set() return {} # some dispatcher here if isinstance(event, repevent.ReceivedItemOutcome): args = {} outcome = event.outcome for key, val in outcome.__dict__.iteritems(): args[key] = str(val) args.update(add_item(event)) mod_name, mod_fullname = self.findmodule(event.item) args['modulename'] = str(mod_name) args['fullmodulename'] = str(mod_fullname) fullitemname = args['fullitemname'] if outcome.skipped: self.skip_reasons[fullitemname] = outcome.skipped elif outcome.excinfo: self.fail_reasons[fullitemname] = self.repr_failure_tblong( event.item, outcome.excinfo, outcome.excinfo.traceback) self.stdout[fullitemname] = outcome.stdout self.stderr[fullitemname] = outcome.stderr elif outcome.signal: self.fail_reasons[fullitemname] = "Received signal %d" % outcome.signal self.stdout[fullitemname] = outcome.stdout self.stderr[fullitemname] = outcome.stderr if event.channel: args['hostkey'] = event.channel.gateway.host.hostid else: args['hostkey'] = '' elif isinstance(event, repevent.ItemStart): args = add_item(event) elif isinstance(event, repevent.TestFinished): args = {} args['run'] = str(self.all) args['fails'] = str(len(self.fail_reasons)) args['skips'] = str(len(self.skip_reasons)) elif isinstance(event, repevent.SendItem): args = add_item(event) args['hostkey'] = event.channel.gateway.host.hostid elif isinstance(event, repevent.HostRSyncRootReady): self.ready_hosts[event.host] = True args = {'hostname' : event.host.hostname, 'hostkey' : event.host.hostid} elif isinstance(event, repevent.FailedTryiter): args = add_item(event) elif isinstance(event, repevent.SkippedTryiter): args = add_item(event) args['reason'] = str(event.excinfo.value) else: args = {} args['event'] = str(event) args['type'] = event.__class__.__name__ return args def repr_failure_tblong(self, item, excinfo, traceback): lines = [] for index, entry in py.builtin.enumerate(traceback): lines.append('----------') lines.append("%s: %s" % (entry.path, entry.lineno)) lines += self.repr_source(entry.relline, entry.source) lines.append("%s: %s" % (excinfo.typename, excinfo.value)) return "\n".join(lines) def repr_source(self, relline, source): lines = [] for num, line in enumerate(source.split("\n")): if num == relline: lines.append(">>>>" + line) else: lines.append(" " + line) return lines def report_ReceivedItemOutcome(self, event): self.all += 1 self.pending_events.put(event) def report_ItemStart(self, event): if isinstance(event.item, py.test.collect.Module): self.pending_events.put(event) def report_unknown(self, event): # XXX: right now, we just pass it for showing self.pending_events.put(event) def _host_ready(self, event): self.pending_events.put(event) def report_HostGatewayReady(self, item): self.to_rsync[item.host] = len(item.roots) def report_HostRSyncRootReady(self, item): self.to_rsync[item.host] -= 1 if not self.to_rsync[item.host]: self._host_ready(item) def report_TestStarted(self, event): # XXX: It overrides out self.hosts self.hosts = {} self.ready_hosts = {} for host in event.hosts: self.hosts[host] = host self.ready_hosts[host] = False self.start_event.set() self.pending_events.put(event) def report(self, what): repfun = getattr(self, "report_" + what.__class__.__name__, self.report_unknown) try: repfun(what) except (KeyboardInterrupt, SystemExit): raise except: print "Internal reporting problem" excinfo = py.code.ExceptionInfo() for i in excinfo.traceback: print str(i)[2:-1] print excinfo ## try: ## self.wait_flag.acquire() ## self.pending_events.insert(0, event) ## self.wait_flag.notify() ## finally: ## self.wait_flag.release() exported_methods = ExportedMethods() class TestHandler(BaseHTTPRequestHandler): exported_methods = exported_methods def do_GET(self): path = self.path if path.endswith("/"): path = path[:-1] if path.startswith("/"): path = path[1:] m = re.match('^(.*)\?(.*)$', path) if m: path = m.group(1) getargs = m.group(2) else: getargs = "" name_path = path.replace(".", "_") method_to_call = getattr(self, "run_" + name_path, None) if method_to_call is None: exec_meth = getattr(self.exported_methods, name_path, None) if exec_meth is None: self.send_error(404, "File %s not found" % path) else: try: self.serve_data('text/json', json.write(exec_meth(**self.parse_args(getargs)))) except socket.error: # client happily disconnected exported_methods.failed(**self.parse_args(getargs)) else: method_to_call() def parse_args(self, getargs): # parse get argument list if getargs == "": return {} unquote = py.std.urllib.unquote args = {} arg_pairs = getargs.split("&") for arg in arg_pairs: key, value = arg.split("=") args[unquote(key)] = unquote(value) return args def log_message(self, format, *args): # XXX just discard it pass do_POST = do_GET def run_(self): self.run_index() def run_index(self): data = py.path.local(DATADIR).join("index.html").open().read() self.serve_data("text/html", data) def run_jssource(self): js_name = py.path.local(__file__).dirpath("webdata").join("source.js") web_name = py.path.local(__file__).dirpath().join("webjs.py") if IMPORTED_PYPY and web_name.mtime() > js_name.mtime(): from py.__.test.rsession import webjs javascript_source = rpython2javascript(webjs, FUNCTION_LIST, use_pdb=False) open(str(js_name), "w").write(javascript_source) self.serve_data("text/javascript", javascript_source) else: js_source = open(str(js_name), "r").read() self.serve_data("text/javascript", js_source) def serve_data(self, content_type, data): self.send_response(200) self.send_header("Content-type", content_type) self.send_header("Content-length", len(data)) self.end_headers() self.wfile.write(data) def start_server(server_address = ('', 8000), handler=TestHandler, start_new=True): httpd = HTTPServer(server_address, handler) if start_new: thread.start_new_thread(httpd.serve_forever, ()) print "Server started, listening on port %d" % (httpd.server_port,) return httpd else: print "Server started, listening on port %d" % (httpd.server_port,) httpd.serve_forever() def kill_server(): exported_methods.pending_events.put(None) while not exported_methods.pending_events.empty(): time.sleep(.1) exported_methods.end_event.wait()