[svn r62612] moved the attic somewhere darker.

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-05 22:51:52 +01:00
parent 65b75cead6
commit 27a501d171
12 changed files with 0 additions and 6002 deletions

View File

@ -1 +0,0 @@
#

View File

@ -1,287 +0,0 @@
""" Rest reporting stuff
"""
import py
import sys
from StringIO import StringIO
from py.__.test.reporter import AbstractReporter
from py.__.test import event
from py.__.rest.rst import *
class RestReporter(AbstractReporter):
linkwriter = None
def __init__(self, config, hosts):
super(RestReporter, self).__init__(config, hosts)
self.rest = Rest()
self.traceback_num = 0
self.failed = dict([(host, 0) for host in hosts])
self.skipped = dict([(host, 0) for host in hosts])
self.passed = dict([(host, 0) for host in hosts])
def get_linkwriter(self):
if self.linkwriter is None:
try:
self.linkwriter = self.config.getvalue('linkwriter')
except KeyError:
print >>sys.stderr, ('no linkwriter configured, using default '
'one')
self.linkwriter = RelLinkWriter()
return self.linkwriter
def report_unknown(self, what):
if self.config.option.verbose:
self.add_rest(Paragraph("Unknown report: %s" % what))
def gethost(self, item):
if item.channel:
return item.channel.gateway.host
return self.hosts[0]
def report_SendItem(self, item):
address = self.gethost(item).hostname
if self.config.option.verbose:
self.add_rest(Paragraph('sending item %s to %s' % (item.item,
address)))
def report_HostRSyncing(self, item):
self.add_rest(LiteralBlock('%10s: RSYNC ==> %s' % (item.host.hostname[:10],
item.host.relpath)))
def _host_ready(self, item):
self.add_rest(LiteralBlock('%10s: READY' % (item.host.hostname[:10],)))
def report_TestStarted(self, event):
txt = "Running tests on hosts: %s" % ", ".join([i.hostname for i in
event.hosts])
self.add_rest(Title(txt, abovechar='=', belowchar='='))
self.timestart = event.timestart
def report_TestTestrunFinish(self, item):
self.timeend = item.timeend
self.summary()
return len(self.failed_tests_outcome) > 0
def report_ImmediateFailure(self, item):
pass
def report_HostGatewayReady(self, item):
self.to_rsync[item.host] = len(item.roots)
def report_ItemStart(self, event):
item = event.item
if isinstance(item, py.test.collect.Module):
lgt = len(list(item._tryiter()))
lns = item.listnames()[1:]
name = "/".join(lns)
link = self.get_linkwriter().get_link(self.get_rootpath(item),
item.fspath)
if link:
name = Link(name, link)
txt = 'Testing module %s (%d items)' % (name, lgt)
self.add_rest(Title('Testing module', name, '(%d items)' % (lgt,),
belowchar='-'))
def get_rootpath(self, item):
root = item.parent
while root.parent is not None:
root = root.parent
return root.fspath
def print_summary(self, total, skipped_str, failed_str):
self.skips()
self.failures()
txt = "%d tests run%s%s in %.2fs (rsync: %.2f)" % \
(total, skipped_str, failed_str, self.timeend - self.timestart,
self.timersync - self.timestart)
self.add_rest(Title(txt, belowchar='-'))
# since we're rendering each item, the links haven't been rendered
# yet
self.out.write(self.rest.render_links())
def report_ItemFinish(self, event):
host = self.gethost(event)
if event.outcome.passed:
status = [Strong("PASSED")]
self.passed[host] += 1
elif event.outcome.skipped:
status = [Strong("SKIPPED")]
self.skipped_tests_outcome.append(event)
self.skipped[host] += 1
else:
status = [Strong("FAILED"),
InternalLink("traceback%d" % self.traceback_num)]
self.traceback_num += 1
self.failed[host] += 1
self.failed_tests_outcome.append(event)
# we'll take care of them later
itempath = self.get_path_from_item(event.item)
status.append(Text(itempath))
hostname = host.hostname
self.add_rest(ListItem(Text("%10s:" % (hostname[:10],)), *status))
def skips(self):
# XXX hrmph, copied code
texts = {}
for event in self.skipped_tests_outcome:
colitem = event.item
if isinstance(event, event.ItemFinish):
outcome = event.outcome
text = outcome.skipped
itemname = self.get_item_name(event, colitem)
elif isinstance(event, event.DeselectedItem):
text = str(event.excinfo.value)
itemname = "/".join(colitem.listnames())
if text not in texts:
texts[text] = [itemname]
else:
texts[text].append(itemname)
if texts:
self.add_rest(Title('Reasons for skipped tests:', belowchar='+'))
for text, items in texts.items():
for item in items:
self.add_rest(ListItem('%s: %s' % (item, text)))
def get_host(self, event):
try:
return event.channel.gateway.host
except AttributeError:
return None
def failures(self):
self.traceback_num = 0
tbstyle = self.config.option.tbstyle
if self.failed_tests_outcome:
self.add_rest(Title('Exceptions:', belowchar='+'))
for i, event in enumerate(self.failed_tests_outcome):
if i > 0:
self.add_rest(Transition())
if isinstance(event, event.ItemFinish):
host = self.get_host(event)
itempath = self.get_path_from_item(event.item)
root = self.get_rootpath(event.item)
link = self.get_linkwriter().get_link(root, event.item.fspath)
t = Title(belowchar='+')
if link:
t.add(Link(itempath, link))
else:
t.add(Text(itempath))
if host:
t.add(Text('on %s' % (host.hostname,)))
self.add_rest(t)
if event.outcome.signal:
self.repr_signal(event.item, event.outcome)
else:
self.repr_failure(event.item, event.outcome, tbstyle)
else:
itempath = self.get_path_from_item(event.item)
root = self.get_rootpath(event.item)
link = self.get_linkwriter().get_link(root, event.item.fspath)
t = Title(abovechar='+', belowchar='+')
if link:
t.add(Link(itempath, link))
else:
t.add(Text(itempath))
out = outcome.Outcome(excinfo=event.excinfo)
self.repr_failure(event.item,
outcome.ReprOutcome(out.make_repr()),
tbstyle)
def repr_signal(self, item, outcome):
signal = outcome.signal
self.add_rest(Title('Received signal: %d' % (outcome.signal,),
abovechar='+', belowchar='+'))
if outcome.stdout.strip():
self.add_rest(Paragraph('Captured process stdout:'))
self.add_rest(LiteralBlock(outcome.stdout))
if outcome.stderr.strip():
self.add_rest(Paragraph('Captured process stderr:'))
self.add_rest(LiteralBlock(outcome.stderr))
def repr_failure(self, item, outcome, style):
excinfo = outcome.excinfo
traceback = excinfo.traceback
if not traceback:
self.add_rest(Paragraph('empty traceback from item %r' % (item,)))
return
self.repr_traceback(item, excinfo, traceback, style)
if outcome.stdout:
self.add_rest(Title('Captured process stdout:', abovechar='+',
belowchar='+'))
self.add_rest(LiteralBlock(outcome.stdout))
if outcome.stderr:
self.add_rest(Title('Captured process stderr:', abovechar='+',
belowchar='+'))
self.add_rest(LiteralBlock(outcome.stderr))
def repr_traceback(self, item, excinfo, traceback, style):
root = self.get_rootpath(item)
self.add_rest(LinkTarget('traceback%d' % self.traceback_num, ""))
self.traceback_num += 1
if style == 'long':
for entry in traceback:
link = self.get_linkwriter().get_link(root,
py.path.local(entry.path))
if link:
self.add_rest(Title(Link(entry.path, link),
'line %d' % (entry.lineno,),
belowchar='+', abovechar='+'))
else:
self.add_rest(Title('%s line %d' % (entry.path,
entry.lineno,),
belowchar='+', abovechar='+'))
self.add_rest(LiteralBlock(self.prepare_source(entry.relline,
entry.source)))
elif style == 'short':
text = []
for entry in traceback:
text.append('%s line %d' % (entry.path, entry.lineno))
text.append(' %s' % (entry.source.strip(),))
self.add_rest(LiteralBlock('\n'.join(text)))
self.add_rest(Title(excinfo.typename, belowchar='+'))
self.add_rest(LiteralBlock(excinfo.value))
def prepare_source(self, relline, source):
text = []
for num, line in enumerate(source.split('\n')):
if num == relline:
text.append('>>> %s' % (line,))
else:
text.append(' %s' % (line,))
return '\n'.join(text)
def add_rest(self, item):
self.rest.add(item)
self.out.write('%s\n\n' % (item.text(),))
def get_path_from_item(self, item):
lns = item.listnames()[1:]
for i, ln in enumerate(lns):
if i > 0 and ln != '()':
lns[i] = '/%s' % (ln,)
itempath = ''.join(lns)
return itempath
class AbstractLinkWriter(object):
def get_link(self, base, path):
pass
class NoLinkWriter(AbstractLinkWriter):
def get_link(self, base, path):
return ''
class LinkWriter(AbstractLinkWriter):
def __init__(self, baseurl):
self.baseurl = baseurl
def get_link(self, base, path):
relpath = path.relto(base)
return self.baseurl + relpath
class RelLinkWriter(AbstractLinkWriter):
def get_link(self, base, path):
return path.relto(base)

View File

@ -1 +0,0 @@
#

View File

@ -1,356 +0,0 @@
""" tests of rest reporter backend
"""
import py
py.test.skip("refactor ReST reporter tests")
from py.__.test.testing.test_reporter import AbstractTestReporter,\
DummyChannel
from py.__.test import event
from py.__.test.dsession.rest import RestReporter, NoLinkWriter
from py.__.rest.rst import *
from py.__.test.dsession.hostmanage import Host
from py.__.test.outcome import SerializableOutcome
class Container(object):
def __init__(self, **args):
for arg, val in args.items():
setattr(self, arg, val)
class RestTestReporter(RestReporter):
def __init__(self, *args, **kwargs):
if args:
super(RestReporter, self).__init__(*args, **kwargs)
class TestRestUnits(object):
def setup_method(self, method):
config = py.test.config._reparse(["some_sub"])
config.option.verbose = False
self.config = config
hosts = [Host('localhost')]
method.im_func.func_globals['ch'] = DummyChannel(hosts[0])
method.im_func.func_globals['reporter'] = r = RestReporter(config,
hosts)
method.im_func.func_globals['stdout'] = s = py.std.StringIO.StringIO()
r.out = s # XXX will need to become a real reporter some time perhaps?
r.linkwriter = NoLinkWriter()
def test_report_unknown(self):
self.config.option.verbose = True
reporter.report_unknown('foo')
assert stdout.getvalue() == 'Unknown report\\: foo\n\n'
self.config.option.verbose = False
def test_report_SendItem(self):
event = event.SendItem(item='foo/bar.py', channel=ch)
reporter.report(event)
assert stdout.getvalue() == ''
stdout.seek(0)
stdout.truncate()
reporter.config.option.verbose = True
reporter.report(event)
assert stdout.getvalue() == ('sending item foo/bar.py to '
'localhost\n\n')
def test_report_HostRSyncing(self):
event = event.HostRSyncing(Host('localhost:/foo/bar'), "a",
"b", False)
reporter.report(event)
assert stdout.getvalue() == ('::\n\n localhost: RSYNC ==> '
'/foo/bar\n\n')
def test_report_HostRSyncRootReady(self):
h = Host('localhost')
reporter.hosts_to_rsync = 1
reporter.report(event.HostGatewayReady(h, ["a"]))
event = event.HostRSyncRootReady(h, "a")
reporter.report(event)
assert stdout.getvalue() == '::\n\n localhost: READY\n\n'
def test_report_TestStarted(self):
event = event.TestStarted([Host('localhost'),
Host('foo.com')],
"aa", ["a", "b"])
reporter.report(event)
assert stdout.getvalue() == """\
===========================================
Running tests on hosts\: localhost, foo.com
===========================================
"""
def test_report_ItemStart(self):
class FakeModule(py.test.collect.Module):
def __init__(self, parent):
self.parent = parent
self.fspath = py.path.local('.')
def _tryiter(self):
return ['test_foo', 'test_bar']
def listnames(self):
return ['package', 'foo', 'bar.py']
parent = Container(parent=None, fspath=py.path.local('.'))
event = event.ItemStart(item=FakeModule(parent))
reporter.report(event)
assert stdout.getvalue() == """\
Testing module foo/bar.py (2 items)
-----------------------------------
"""
def test_print_summary(self):
reporter.timestart = 10
reporter.timeend = 20
reporter.timersync = 15
reporter.print_summary(10, '', '')
assert stdout.getvalue() == """\
10 tests run in 10.00s (rsync\: 5.00)
-------------------------------------
"""
def test_ItemFinish_PASSED(self):
outcome = SerializableOutcome()
item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'])
event = event.ItemFinish(channel=ch, outcome=outcome, item=item)
reporter.report(event)
assert stdout.getvalue() == ('* localhost\\: **PASSED** '
'foo.py/bar()/baz\n\n')
def test_ItemFinish_SKIPPED(self):
outcome = SerializableOutcome(skipped="reason")
item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'])
event = event.ItemFinish(channel=ch, outcome=outcome, item=item)
reporter.report(event)
assert stdout.getvalue() == ('* localhost\\: **SKIPPED** '
'foo.py/bar()/baz\n\n')
def test_ItemFinish_FAILED(self):
outcome = SerializableOutcome(excinfo="xxx")
item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'])
event = event.ItemFinish(channel=ch, outcome=outcome, item=item)
reporter.report(event)
assert stdout.getvalue() == """\
* localhost\: **FAILED** `traceback0`_ foo.py/bar()/baz
"""
def test_ItemFinish_FAILED_stdout(self):
excinfo = Container(
typename='FooError',
value='A foo has occurred',
traceback=[
Container(
path='foo/bar.py',
lineno=1,
relline=1,
source='foo()',
),
Container(
path='foo/baz.py',
lineno=4,
relline=1,
source='raise FooError("A foo has occurred")',
),
]
)
outcome = SerializableOutcome(excinfo=excinfo)
outcome.stdout = '<printed>'
outcome.stderr = ''
parent = Container(parent=None, fspath=py.path.local('.'))
item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'],
parent=parent, fspath=py.path.local('foo'))
event = event.ItemFinish(channel=ch, outcome=outcome,
item=item)
reporter.report(event)
reporter.timestart = 10
reporter.timeend = 20
reporter.timersync = 15
reporter.print_summary(10, '', '')
reporter.print_summary(1, 'skipped', 'failed')
out = stdout.getvalue()
assert out.find('<printed>') > -1
def test_skips(self):
class FakeOutcome(Container, event.ItemFinish):
pass
class FakeTryiter(Container, event.DeselectedItem):
pass
reporter.skips()
assert stdout.getvalue() == ''
reporter.skipped_tests_outcome = [
FakeOutcome(outcome=Container(skipped='problem X'),
item=Container(listnames=lambda: ['foo', 'bar.py'])),
FakeTryiter(excinfo=Container(value='problem Y'),
item=Container(listnames=lambda: ['foo', 'baz.py']))]
reporter.skips()
assert stdout.getvalue() == """\
Reasons for skipped tests\:
+++++++++++++++++++++++++++
* foo/bar.py\: problem X
* foo/baz.py\: problem Y
"""
def test_failures(self):
class FakeOutcome(Container, event.ItemFinish):
pass
parent = Container(parent=None, fspath=py.path.local('.'))
reporter.failed_tests_outcome = [
FakeOutcome(
outcome=Container(
signal=False,
excinfo=Container(
typename='FooError',
value='A foo has occurred',
traceback=[
Container(
path='foo/bar.py',
lineno=1,
relline=1,
source='foo()',
),
Container(
path='foo/baz.py',
lineno=4,
relline=1,
source='raise FooError("A foo has occurred")',
),
]
),
stdout='',
stderr='',
),
item=Container(
listnames=lambda: ['package', 'foo', 'bar.py',
'baz', '()'],
parent=parent,
fspath=py.path.local('.'),
),
channel=ch,
),
]
reporter.config.option.tbstyle = 'no'
reporter.failures()
expected = """\
Exceptions\:
++++++++++++
foo/bar.py/baz() on localhost
+++++++++++++++++++++++++++++
.. _`traceback0`:
FooError
++++++++
::
A foo has occurred
"""
assert stdout.getvalue() == expected
reporter.config.option.tbstyle = 'short'
stdout.seek(0)
stdout.truncate()
reporter.failures()
expected = """\
Exceptions\:
++++++++++++
foo/bar.py/baz() on localhost
+++++++++++++++++++++++++++++
.. _`traceback0`:
::
foo/bar.py line 1
foo()
foo/baz.py line 4
raise FooError("A foo has occurred")
FooError
++++++++
::
A foo has occurred
"""
assert stdout.getvalue() == expected
reporter.config.option.tbstyle = 'long'
stdout.seek(0)
stdout.truncate()
reporter.failures()
expected = """\
Exceptions\:
++++++++++++
foo/bar.py/baz() on localhost
+++++++++++++++++++++++++++++
.. _`traceback0`:
+++++++++++++++++
foo/bar.py line 1
+++++++++++++++++
::
foo()
+++++++++++++++++
foo/baz.py line 4
+++++++++++++++++
::
raise FooError("A foo has occurred")
FooError
++++++++
::
A foo has occurred
"""
assert stdout.getvalue() == expected
class TestRestReporter(AbstractTestReporter):
reporter = RestReporter
def get_hosts(self):
return [Host('localhost')]
def test_failed_to_load(self):
py.test.skip("Not implemented")
def test_report_received_item_outcome(self):
val = self.report_received_item_outcome()
expected_list = [
"**FAILED**",
"**SKIPPED**",
"**PASSED**",
"* localhost\:",
"`traceback0`_ test\_one.py/funcpass",
"test\_one.py/funcpass"]
for expected in expected_list:
assert val.find(expected) != -1

View File

@ -1,90 +0,0 @@
""" webtest
"""
import py
def setup_module(mod):
try:
from pypy.translator.js.main import rpython2javascript
from pypy.translator.js import commproxy
mod.commproxy = commproxy
mod.rpython2javascript = rpython2javascript
except ImportError:
py.test.skip("PyPy not found")
mod.commproxy.USE_MOCHIKIT = False
mod.rpython2javascript = rpython2javascript
mod.commproxy = mod.commproxy
from py.__.test.report.web import TestHandler as _TestHandler
from py.__.test.report.web import MultiQueue
mod._TestHandler = _TestHandler
mod.MultiQueue = MultiQueue
def test_js_generate():
from py.__.test.report import webjs
from py.__.test.report.web import FUNCTION_LIST, IMPORTED_PYPY
source = rpython2javascript(webjs, FUNCTION_LIST, use_pdb=False)
assert source
def test_parse_args():
class TestTestHandler(_TestHandler):
def __init__(self):
pass
h = TestTestHandler()
assert h.parse_args('foo=bar') == {'foo': 'bar'}
assert h.parse_args('foo=bar%20baz') == {'foo': 'bar baz'}
assert h.parse_args('foo%20bar=baz') == {'foo bar': 'baz'}
assert h.parse_args('foo=bar%baz') == {'foo': 'bar\xbaz'}
py.test.raises(ValueError, 'h.parse_args("foo")')
class TestMultiQueue(object):
def test_get_one_sessid(self):
mq = MultiQueue()
mq.put(1)
result = mq.get(1234)
assert result == 1
def test_get_two_sessid(self):
mq = MultiQueue()
mq.put(1)
result = mq.get(1234)
assert result == 1
mq.put(2)
result = mq.get(1234)
assert result == 2
result = mq.get(5678)
assert result == 1
result = mq.get(5678)
assert result == 2
def test_get_blocking(self):
import thread
result = []
def getitem(mq, sessid):
result.append(mq.get(sessid))
mq = MultiQueue()
thread.start_new_thread(getitem, (mq, 1234))
assert not result
mq.put(1)
py.std.time.sleep(0.1)
assert result == [1]
def test_empty(self):
mq = MultiQueue()
assert mq.empty()
mq.put(1)
assert not mq.empty()
result = mq.get(1234)
result == 1
assert mq.empty()
mq.put(2)
result = mq.get(4567)
assert result == 1
result = mq.get(1234)
assert result == 2
assert not mq.empty()
result = mq.get(4567)
assert result == 2
assert mq.empty()

View File

@ -1,143 +0,0 @@
import py
def check(mod):
try:
import pypy
from pypy.translator.js.modules import dom
from pypy.translator.js.tester import schedule_callbacks
dom.Window # check whether dom was properly imported or is just a
# leftover in sys.modules
except (ImportError, AttributeError):
py.test.skip('PyPy not found')
mod.dom = dom
mod.schedule_callbacks = schedule_callbacks
from py.__.test.report import webjs
from py.__.test.report.web import exported_methods
mod.webjs = webjs
mod.exported_methods = exported_methods
mod.here = py.magic.autopath().dirpath()
def setup_module(mod):
check(mod)
# load HTML into window object
html = here.join('../webdata/index.html').read()
mod.html = html
from pypy.translator.js.modules import dom
mod.dom = dom
dom.window = dom.Window(html)
dom.document = dom.window.document
from py.__.test.report import webjs
from py.__.test.report.web import exported_methods
mod.webjs = webjs
mod.exported_methods = exported_methods
def setup_function(f):
dom.window = dom.Window(html)
dom.document = dom.window.document
def test_html_loaded():
body = dom.window.document.getElementsByTagName('body')[0]
assert len(body.childNodes) > 0
assert str(body.childNodes[1].nodeName) == 'A'
def test_set_msgbox():
py.test.skip("not implemented in genjs")
msgbox = dom.window.document.getElementById('messagebox')
assert len(msgbox.childNodes) == 0
webjs.set_msgbox('foo', 'bar')
assert len(msgbox.childNodes) == 1
assert msgbox.childNodes[0].nodeName == 'PRE'
assert msgbox.childNodes[0].childNodes[0].nodeValue == 'foo\nbar'
def test_show_info():
info = dom.window.document.getElementById('info')
info.style.visibility = 'hidden'
info.innerHTML = ''
webjs.show_info('foobar')
content = info.innerHTML
assert content == 'foobar'
bgcolor = info.style.backgroundColor
assert bgcolor == 'beige'
def test_hide_info():
info = dom.window.document.getElementById('info')
info.style.visibility = 'visible'
webjs.hide_info()
assert info.style.visibility == 'hidden'
def test_process():
main_t = dom.window.document.getElementById('main_table')
assert len(main_t.getElementsByTagName('tr')) == 0
assert not webjs.process({})
msg = {'type': 'ItemStart',
'itemtype': 'Module',
'itemname': 'foo.py',
'fullitemname': 'modules/foo.py',
'length': 10,
}
assert webjs.process(msg)
trs = main_t.getElementsByTagName('tr')
assert len(trs) == 1
tr = trs[0]
assert len(tr.childNodes) == 2
assert tr.childNodes[0].nodeName == 'TD'
assert tr.childNodes[0].innerHTML == 'foo.py[0/10]'
assert tr.childNodes[1].nodeName == 'TD'
assert tr.childNodes[1].childNodes[0].nodeName == 'TABLE'
assert len(tr.childNodes[1].getElementsByTagName('tr')) == 0
def test_process_two():
main_t = dom.window.document.getElementById('main_table')
msg = {'type': 'ItemStart',
'itemtype': 'Module',
'itemname': 'foo.py',
'fullitemname': 'modules/foo.py',
'length': 10,
}
webjs.process(msg)
msg = {'type': 'ItemFinish',
'fullmodulename': 'modules/foo.py',
'passed' : 'True',
'fullitemname' : 'modules/foo.py/test_item',
'hostkey': None,
}
webjs.process(msg)
trs = main_t.getElementsByTagName('tr')
tds = trs[0].getElementsByTagName('td')
# two cells in the row, one in the table inside one of the cells
assert len(tds) == 3
html = tds[0].innerHTML
assert html == 'foo.py[1/10]'
assert tds[2].innerHTML == '.'
def test_signal():
main_t = dom.window.document.getElementById('main_table')
msg = {'type': 'ItemStart',
'itemtype': 'Module',
'itemname': 'foo.py',
'fullitemname': 'modules/foo.py',
'length': 10,
}
webjs.process(msg)
msg = {'type': 'ItemFinish',
'fullmodulename': 'modules/foo.py',
'passed' : 'False',
'fullitemname' : 'modules/foo.py/test_item',
'hostkey': None,
'signal': '10',
'skipped': 'False',
}
exported_methods.fail_reasons['modules/foo.py/test_item'] = 'Received signal 10'
exported_methods.stdout['modules/foo.py/test_item'] = ''
exported_methods.stderr['modules/foo.py/test_item'] = ''
webjs.process(msg)
schedule_callbacks(exported_methods)
# ouch
assert dom.document.getElementById('modules/foo.py').childNodes[0].\
childNodes[0].childNodes[0].childNodes[0].nodeValue == 'F'
# XXX: Write down test for full run

View File

@ -1,471 +0,0 @@
""" web server for py.test
"""
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
import thread, threading
import re
import time
import random
import Queue
import os
import sys
import socket
import py
from py.test import collect
from py.__.test.report.webdata import json
DATADIR = py.path.local(__file__).dirpath("webdata")
FUNCTION_LIST = ["main", "show_skip", "show_traceback", "show_info", "hide_info",
"show_host", "hide_host", "hide_messagebox", "opt_scroll"]
try:
from pypy.rpython.ootypesystem.bltregistry import MethodDesc, BasicExternal
from pypy.translator.js.main import rpython2javascript
from pypy.translator.js import commproxy
from pypy.translator.js.lib.support import callback
commproxy.USE_MOCHIKIT = False
IMPORTED_PYPY = True
except (ImportError, NameError):
class BasicExternal(object):
pass
def callback(*args, **kwargs):
def decorator(func):
return func
return decorator
IMPORTED_PYPY = False
def add_item(event):
""" A little helper
"""
item = event.item
itemtype = item.__class__.__name__
itemname = item.name
fullitemname = "/".join(item.listnames())
d = {'fullitemname': fullitemname, 'itemtype': itemtype,
'itemname': itemname}
#if itemtype == 'Module':
try:
d['length'] = str(len(list(event.item._tryiter())))
except:
d['length'] = "?"
return d
class MultiQueue(object):
""" a tailor-made queue (internally using Queue) for py.test.dsession.web
API-wise the main difference is that the get() method gets a sessid
argument, which is used to determine what data to feed to the client
when a data queue for a sessid doesn't yet exist, it is created, and
filled with data that has already been fed to the other clients
"""
def __init__(self):
self._cache = []
self._session_queues = {}
self._lock = py.std.thread.allocate_lock()
def put(self, item):
self._lock.acquire()
try:
self._cache.append(item)
for key, q in self._session_queues.items():
q.put(item)
finally:
self._lock.release()
def _del(self, sessid):
self._lock.acquire()
try:
del self._session_queues[sessid]
finally:
self._lock.release()
def get(self, sessid):
self._lock.acquire()
try:
if not sessid in self._session_queues:
self._create_session_queue(sessid)
finally:
self._lock.release()
return self._session_queues[sessid].get(sessid)
def empty(self):
self._lock.acquire()
try:
if not self._session_queues:
return not len(self._cache)
for q in self._session_queues.values():
if not q.empty():
return False
finally:
self._lock.release()
return True
def empty_queue(self, sessid):
return self._session_queues[sessid].empty()
def _create_session_queue(self, sessid):
self._session_queues[sessid] = q = Queue.Queue()
for item in self._cache:
q.put(item)
class ExportedMethods(BasicExternal):
_render_xmlhttp = True
def __init__(self):
self.pending_events = MultiQueue()
self.start_event = threading.Event()
self.end_event = threading.Event()
self.skip_reasons = {}
self.fail_reasons = {}
self.stdout = {}
self.stderr = {}
self.all = 0
self.to_rsync = {}
def findmodule(self, item):
# find the most outwards parent which is module
current = item
while current:
if isinstance(current, collect.Module):
break
current = current.parent
if current is not None:
return str(current.name), str("/".join(current.listnames()))
else:
return str(item.parent.name), str("/".join(item.parent.listnames()))
def show_hosts(self):
self.start_event.wait()
to_send = {}
for host in self.hosts:
to_send[host.hostid] = host.hostname
return to_send
show_hosts = callback(retval={str:str})(show_hosts)
def show_skip(self, item_name="aa"):
return {'item_name': item_name,
'reason': self.skip_reasons[item_name]}
show_skip = callback(retval={str:str})(show_skip)
def show_fail(self, item_name="aa"):
return {'item_name':item_name,
'traceback':str(self.fail_reasons[item_name]),
'stdout':self.stdout[item_name],
'stderr':self.stderr[item_name]}
show_fail = callback(retval={str:str})(show_fail)
_sessids = None
_sesslock = py.std.thread.allocate_lock()
def show_sessid(self):
if not self._sessids:
self._sessids = []
self._sesslock.acquire()
try:
while 1:
sessid = ''.join(py.std.random.sample(
py.std.string.lowercase, 8))
if sessid not in self._sessids:
self._sessids.append(sessid)
break
finally:
self._sesslock.release()
return sessid
show_sessid = callback(retval=str)(show_sessid)
def failed(self, **kwargs):
if not 'sessid' in kwargs:
return
sessid = kwargs['sessid']
to_del = -1
for num, i in enumerate(self._sessids):
if i == sessid:
to_del = num
if to_del != -1:
del self._sessids[to_del]
self.pending_events._del(kwargs['sessid'])
def show_all_statuses(self, sessid='xx'):
retlist = [self.show_status_change(sessid)]
while not self.pending_events.empty_queue(sessid):
retlist.append(self.show_status_change(sessid))
retval = retlist
return retval
show_all_statuses = callback(retval=[{str:str}])(show_all_statuses)
def show_status_change(self, sessid):
event = self.pending_events.get(sessid)
if event is None:
self.end_event.set()
return {}
# some dispatcher here
if isinstance(event, event.ItemFinish):
args = {}
outcome = event.outcome
for key, val in outcome.__dict__.iteritems():
args[key] = str(val)
args.update(add_item(event))
mod_name, mod_fullname = self.findmodule(event.item)
args['modulename'] = str(mod_name)
args['fullmodulename'] = str(mod_fullname)
fullitemname = args['fullitemname']
if outcome.skipped:
self.skip_reasons[fullitemname] = self.repr_failure_tblong(
event.item,
outcome.skipped,
outcome.skipped.traceback)
elif outcome.excinfo:
self.fail_reasons[fullitemname] = self.repr_failure_tblong(
event.item, outcome.excinfo, outcome.excinfo.traceback)
self.stdout[fullitemname] = outcome.stdout
self.stderr[fullitemname] = outcome.stderr
elif outcome.signal:
self.fail_reasons[fullitemname] = "Received signal %d" % outcome.signal
self.stdout[fullitemname] = outcome.stdout
self.stderr[fullitemname] = outcome.stderr
if event.channel:
args['hostkey'] = event.channel.gateway.host.hostid
else:
args['hostkey'] = ''
elif isinstance(event, event.ItemStart):
args = add_item(event)
elif isinstance(event, event.TestTestrunFinish):
args = {}
args['run'] = str(self.all)
args['fails'] = str(len(self.fail_reasons))
args['skips'] = str(len(self.skip_reasons))
elif isinstance(event, event.SendItem):
args = add_item(event)
args['hostkey'] = event.channel.gateway.host.hostid
elif isinstance(event, event.HostRSyncRootReady):
self.ready_hosts[event.host] = True
args = {'hostname' : event.host.hostname, 'hostkey' : event.host.hostid}
elif isinstance(event, event.FailedTryiter):
args = add_item(event)
elif isinstance(event, event.DeselectedItem):
args = add_item(event)
args['reason'] = str(event.excinfo.value)
else:
args = {}
args['event'] = str(event)
args['type'] = event.__class__.__name__
return args
def repr_failure_tblong(self, item, excinfo, traceback):
lines = []
for index, entry in py.builtin.enumerate(traceback):
lines.append('----------')
lines.append("%s: %s" % (entry.path, entry.lineno))
lines += self.repr_source(entry.relline, entry.source)
lines.append("%s: %s" % (excinfo.typename, excinfo.value))
return "\n".join(lines)
def repr_source(self, relline, source):
lines = []
for num, line in enumerate(str(source).split("\n")):
if num == relline:
lines.append(">>>>" + line)
else:
lines.append(" " + line)
return lines
def report_ItemFinish(self, event):
self.all += 1
self.pending_events.put(event)
def report_FailedTryiter(self, event):
fullitemname = "/".join(event.item.listnames())
self.fail_reasons[fullitemname] = self.repr_failure_tblong(
event.item, event.excinfo, event.excinfo.traceback)
self.stdout[fullitemname] = ''
self.stderr[fullitemname] = ''
self.pending_events.put(event)
def report_ItemStart(self, event):
if isinstance(event.item, py.test.collect.Module):
self.pending_events.put(event)
def report_unknown(self, event):
# XXX: right now, we just pass it for showing
self.pending_events.put(event)
def _host_ready(self, event):
self.pending_events.put(event)
def report_HostGatewayReady(self, item):
self.to_rsync[item.host] = len(item.roots)
def report_HostRSyncRootReady(self, item):
self.to_rsync[item.host] -= 1
if not self.to_rsync[item.host]:
self._host_ready(item)
def report_TestStarted(self, event):
# XXX: It overrides our self.hosts
self.hosts = {}
self.ready_hosts = {}
if not event.hosts:
self.hosts = []
else:
for host in event.hosts:
self.hosts[host] = host
self.ready_hosts[host] = False
self.start_event.set()
self.pending_events.put(event)
def report_TestTestrunFinish(self, event):
self.pending_events.put(event)
kill_server()
report_InterruptedExecution = report_TestTestrunFinish
report_CrashedExecution = report_TestTestrunFinish
def report(self, what):
repfun = getattr(self, "report_" + what.__class__.__name__,
self.report_unknown)
try:
repfun(what)
except (KeyboardInterrupt, SystemExit):
raise
except:
print "Internal reporting problem"
excinfo = py.code.ExceptionInfo()
for i in excinfo.traceback:
print str(i)[2:-1]
print excinfo
exported_methods = ExportedMethods()
class TestHandler(BaseHTTPRequestHandler):
exported_methods = exported_methods
def do_GET(self):
path = self.path
if path.endswith("/"):
path = path[:-1]
if path.startswith("/"):
path = path[1:]
m = re.match('^(.*)\?(.*)$', path)
if m:
path = m.group(1)
getargs = m.group(2)
else:
getargs = ""
name_path = path.replace(".", "_")
method_to_call = getattr(self, "run_" + name_path, None)
if method_to_call is None:
exec_meth = getattr(self.exported_methods, name_path, None)
if exec_meth is None:
self.send_error(404, "File %s not found" % path)
else:
try:
self.serve_data('text/json',
json.write(exec_meth(**self.parse_args(getargs))))
except socket.error:
# client happily disconnected
exported_methods.failed(**self.parse_args(getargs))
else:
method_to_call()
def parse_args(self, getargs):
# parse get argument list
if getargs == "":
return {}
unquote = py.std.urllib.unquote
args = {}
arg_pairs = getargs.split("&")
for arg in arg_pairs:
key, value = arg.split("=")
args[unquote(key)] = unquote(value)
return args
def log_message(self, format, *args):
# XXX just discard it
pass
do_POST = do_GET
def run_(self):
self.run_index()
def run_index(self):
data = py.path.local(DATADIR).join("index.html").open().read()
self.serve_data("text/html", data)
def run_jssource(self):
js_name = py.path.local(__file__).dirpath("webdata").join("source.js")
web_name = py.path.local(__file__).dirpath().join("webjs.py")
if IMPORTED_PYPY and web_name.mtime() > js_name.mtime() or \
(not js_name.check()):
from py.__.test.dsession import webjs
javascript_source = rpython2javascript(webjs,
FUNCTION_LIST, use_pdb=False)
open(str(js_name), "w").write(javascript_source)
self.serve_data("text/javascript", javascript_source)
else:
js_source = open(str(js_name), "r").read()
self.serve_data("text/javascript", js_source)
def serve_data(self, content_type, data):
self.send_response(200)
self.send_header("Content-type", content_type)
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
class WebReporter(object):
""" A simple wrapper, this file needs ton of refactoring
anyway, so this is just to satisfy things below
(and start to create saner interface as well)
"""
def __init__(self, config, hosts):
start_server_from_config(config)
def was_failure(self):
return sum(exported_methods.fail_reasons.values()) > 0
# rebind
report = exported_methods.report
__call__ = report
def start_server_from_config(config):
if config.option.runbrowser:
port = socket.INADDR_ANY
else:
port = 8000
httpd = start_server(server_address = ('', port))
port = httpd.server_port
if config.option.runbrowser:
import webbrowser, thread
# webbrowser.open() may block until the browser finishes or not
url = "http://localhost:%d" % (port,)
thread.start_new_thread(webbrowser.open, (url,))
return exported_methods.report
def start_server(server_address = ('', 8000), handler=TestHandler, start_new=True):
httpd = HTTPServer(server_address, handler)
if start_new:
thread.start_new_thread(httpd.serve_forever, ())
print "Server started, listening on port %d" % (httpd.server_port,)
return httpd
else:
print "Server started, listening on port %d" % (httpd.server_port,)
httpd.serve_forever()
def kill_server():
exported_methods.pending_events.put(None)
while not exported_methods.pending_events.empty():
time.sleep(.1)
exported_methods.end_event.wait()

View File

@ -1,126 +0,0 @@
<html>
<head>
<title>Py.test</title>
<script type="text/javascript" src="/jssource"></script>
<style type="text/css">
body, td, div {
font-family: Andale mono;
background-color: #ffd;
color: #003;
}
#info {
visibility: hidden;
position: fixed;
left: 0px;
top: 0px;
text-align: center;
width: 100%;
z-index: 2;
}
div.main {
margin-right: 170px;
}
.error {
color: #F00;
font-weight: bold;
}
#navbar {
position: fixed;
right: 1em;
top: 1em;
background-color: #ffd;
border: 1px solid #003;
z-index: 1;
width: 160px;
}
#navbar tr, #navbar td {
border: 0px;
}
#navbar a {
color: #003;
text-decoration: none;
}
#navbar .title {
font-weight: bold;
}
#jobs {
position: absolute;
visibility: hidden;
right: 2px;
}
#jobs, #jobs td {
background-color: #ffd;
border: 1px solid #003;
}
#jobs tr, #jobs td {
border: 0px;
}
</style>
</head>
<body onload="main()">
<a name="beginning">
<h3 id="Tests">Tests</h3>
</a>
<div id="info">
&#xa0;
</div>
<table id="navbar">
<tbody>
<tr>
<td class="title">Hosts status</td>
</tr>
<tr>
<td>
<table id="hosts">
<tbody id="hostsbody">
</tbody>
</table>
<table id="jobs">
</table>
</td>
</tr>
<tr>
<td class="title">Navigation</td>
</tr>
<tr>
<td><a href="#beginning">Top</a></td>
</tr>
<tr>
<td><a href="#message">Traceback info</a></td>
</tr>
<tr>
<td><a href="#aftermessage">End of Traceback</a></td>
</tr>
<tr>
<td class="title">Options</td>
</tr>
<tr>
<td><input type="checkbox" id="opt_scroll" onchange="javascript:opt_scroll()"/><u>S</u>croll with tests arriving</td>
</tr>
</tbody>
</table>
<div class="main">
<table id="main_table">
</table>
<fieldset id="messagebox_fieldset">
<legend><b>Data [<a href="javascript:hide_messagebox()">hide</a>]:</b></legend>
<a name="message"> </a>
<div id="messagebox"></div>
</fieldset>
</div>
<a name="aftermessage"> </a>
<div id="testmain"></div>
</body>
</html>

View File

@ -1,310 +0,0 @@
import string
import types
## json.py implements a JSON (http://json.org) reader and writer.
## Copyright (C) 2005 Patrick D. Logan
## Contact mailto:patrickdlogan@stardecisions.com
##
## This library is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public
## License as published by the Free Software Foundation; either
## version 2.1 of the License, or (at your option) any later version.
##
## This library is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public
## License along with this library; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
class _StringGenerator(object):
def __init__(self, string):
self.string = string
self.index = -1
def peek(self):
i = self.index + 1
if i < len(self.string):
return self.string[i]
else:
return None
def next(self):
self.index += 1
if self.index < len(self.string):
return self.string[self.index]
else:
raise StopIteration
def all(self):
return self.string
class WriteException(Exception):
pass
class ReadException(Exception):
pass
class JsonReader(object):
hex_digits = {'A': 10,'B': 11,'C': 12,'D': 13,'E': 14,'F':15}
escapes = {'t':'\t','n':'\n','f':'\f','r':'\r','b':'\b'}
def read(self, s):
self._generator = _StringGenerator(s)
result = self._read()
return result
def _read(self):
self._eatWhitespace()
peek = self._peek()
if peek is None:
raise ReadException, "Nothing to read: '%s'" % self._generator.all()
if peek == '{':
return self._readObject()
elif peek == '[':
return self._readArray()
elif peek == '"':
return self._readString()
elif peek == '-' or peek.isdigit():
return self._readNumber()
elif peek == 't':
return self._readTrue()
elif peek == 'f':
return self._readFalse()
elif peek == 'n':
return self._readNull()
elif peek == '/':
self._readComment()
return self._read()
else:
raise ReadException, "Input is not valid JSON: '%s'" % self._generator.all()
def _readTrue(self):
self._assertNext('t', "true")
self._assertNext('r', "true")
self._assertNext('u', "true")
self._assertNext('e', "true")
return True
def _readFalse(self):
self._assertNext('f', "false")
self._assertNext('a', "false")
self._assertNext('l', "false")
self._assertNext('s', "false")
self._assertNext('e', "false")
return False
def _readNull(self):
self._assertNext('n', "null")
self._assertNext('u', "null")
self._assertNext('l', "null")
self._assertNext('l', "null")
return None
def _assertNext(self, ch, target):
if self._next() != ch:
raise ReadException, "Trying to read %s: '%s'" % (target, self._generator.all())
def _readNumber(self):
isfloat = False
result = self._next()
peek = self._peek()
while peek is not None and (peek.isdigit() or peek == "."):
isfloat = isfloat or peek == "."
result = result + self._next()
peek = self._peek()
try:
if isfloat:
return float(result)
else:
return int(result)
except ValueError:
raise ReadException, "Not a valid JSON number: '%s'" % result
def _readString(self):
result = ""
assert self._next() == '"'
try:
while self._peek() != '"':
ch = self._next()
if ch == "\\":
ch = self._next()
if ch in 'brnft':
ch = self.escapes[ch]
elif ch == "u":
ch4096 = self._next()
ch256 = self._next()
ch16 = self._next()
ch1 = self._next()
n = 4096 * self._hexDigitToInt(ch4096)
n += 256 * self._hexDigitToInt(ch256)
n += 16 * self._hexDigitToInt(ch16)
n += self._hexDigitToInt(ch1)
ch = unichr(n)
elif ch not in '"/\\':
raise ReadException, "Not a valid escaped JSON character: '%s' in %s" % (ch, self._generator.all())
result = result + ch
except StopIteration:
raise ReadException, "Not a valid JSON string: '%s'" % self._generator.all()
assert self._next() == '"'
return result
def _hexDigitToInt(self, ch):
try:
result = self.hex_digits[ch.upper()]
except KeyError:
try:
result = int(ch)
except ValueError:
raise ReadException, "The character %s is not a hex digit." % ch
return result
def _readComment(self):
assert self._next() == "/"
second = self._next()
if second == "/":
self._readDoubleSolidusComment()
elif second == '*':
self._readCStyleComment()
else:
raise ReadException, "Not a valid JSON comment: %s" % self._generator.all()
def _readCStyleComment(self):
try:
done = False
while not done:
ch = self._next()
done = (ch == "*" and self._peek() == "/")
if not done and ch == "/" and self._peek() == "*":
raise ReadException, "Not a valid JSON comment: %s, '/*' cannot be embedded in the comment." % self._generator.all()
self._next()
except StopIteration:
raise ReadException, "Not a valid JSON comment: %s, expected */" % self._generator.all()
def _readDoubleSolidusComment(self):
try:
ch = self._next()
while ch != "\r" and ch != "\n":
ch = self._next()
except StopIteration:
pass
def _readArray(self):
result = []
assert self._next() == '['
done = self._peek() == ']'
while not done:
item = self._read()
result.append(item)
self._eatWhitespace()
done = self._peek() == ']'
if not done:
ch = self._next()
if ch != ",":
raise ReadException, "Not a valid JSON array: '%s' due to: '%s'" % (self._generator.all(), ch)
assert ']' == self._next()
return result
def _readObject(self):
result = {}
assert self._next() == '{'
done = self._peek() == '}'
while not done:
key = self._read()
if type(key) is not types.StringType:
raise ReadException, "Not a valid JSON object key (should be a string): %s" % key
self._eatWhitespace()
ch = self._next()
if ch != ":":
raise ReadException, "Not a valid JSON object: '%s' due to: '%s'" % (self._generator.all(), ch)
self._eatWhitespace()
val = self._read()
result[key] = val
self._eatWhitespace()
done = self._peek() == '}'
if not done:
ch = self._next()
if ch != ",":
raise ReadException, "Not a valid JSON array: '%s' due to: '%s'" % (self._generator.all(), ch)
assert self._next() == "}"
return result
def _eatWhitespace(self):
p = self._peek()
while p is not None and p in string.whitespace or p == '/':
if p == '/':
self._readComment()
else:
self._next()
p = self._peek()
def _peek(self):
return self._generator.peek()
def _next(self):
return self._generator.next()
class JsonWriter(object):
def _append(self, s):
self._results.append(s)
def write(self, obj, escaped_forward_slash=False):
self._escaped_forward_slash = escaped_forward_slash
self._results = []
self._write(obj)
return "".join(self._results)
def _write(self, obj):
ty = type(obj)
if ty is types.DictType:
n = len(obj)
self._append("{")
for k, v in obj.items():
self._write(k)
self._append(":")
self._write(v)
n = n - 1
if n > 0:
self._append(",")
self._append("}")
elif ty is types.ListType or ty is types.TupleType:
n = len(obj)
self._append("[")
for item in obj:
self._write(item)
n = n - 1
if n > 0:
self._append(",")
self._append("]")
elif ty is types.StringType or ty is types.UnicodeType:
self._append('"')
obj = obj.replace('\\', r'\\')
if self._escaped_forward_slash:
obj = obj.replace('/', r'\/')
obj = obj.replace('"', r'\"')
obj = obj.replace('\b', r'\b')
obj = obj.replace('\f', r'\f')
obj = obj.replace('\n', r'\n')
obj = obj.replace('\r', r'\r')
obj = obj.replace('\t', r'\t')
self._append(obj)
self._append('"')
elif ty is types.IntType or ty is types.LongType:
self._append(str(obj))
elif ty is types.FloatType:
self._append("%f" % obj)
elif obj is True:
self._append("true")
elif obj is False:
self._append("false")
elif obj is None:
self._append("null")
else:
raise WriteException, "Cannot write in JSON: %s" % repr(obj)
def write(obj, escaped_forward_slash=False):
return JsonWriter().write(obj, escaped_forward_slash)
def read(s):
return JsonReader().read(s)

File diff suppressed because it is too large Load Diff

View File

@ -1,355 +0,0 @@
""" javascript source for py.test distributed
"""
import py
from py.__.test.report.web import exported_methods
try:
from pypy.translator.js.modules import dom
from pypy.translator.js.helper import __show_traceback
except ImportError:
py.test.skip("PyPy not found")
def create_elem(s):
return dom.document.createElement(s)
def get_elem(el):
return dom.document.getElementById(el)
def create_text_elem(txt):
return dom.document.createTextNode(txt)
tracebacks = {}
skips = {}
counters = {}
max_items = {}
short_item_names = {}
MAX_COUNTER = 30 # Maximal size of one-line table
class Globals(object):
def __init__(self):
self.pending = []
self.host = ""
self.data_empty = True
glob = Globals()
class Options(object):
""" Store global options
"""
def __init__(self):
self.scroll = True
opts = Options()
def comeback(msglist):
if len(msglist) == 0:
return
for item in glob.pending[:]:
if not process(item):
return
glob.pending = []
for msg in msglist:
if not process(msg):
return
exported_methods.show_all_statuses(glob.sessid, comeback)
def show_info(data="aa"):
info = dom.document.getElementById("info")
info.style.visibility = "visible"
while len(info.childNodes):
info.removeChild(info.childNodes[0])
txt = create_text_elem(data)
info.appendChild(txt)
info.style.backgroundColor = "beige"
# XXX: Need guido
def hide_info():
info = dom.document.getElementById("info")
info.style.visibility = "hidden"
def show_interrupt():
glob.finished = True
dom.document.title = "Py.test [interrupted]"
dom.document.getElementById("Tests").childNodes[0].nodeValue = "Tests [interrupted]"
def show_crash():
glob.finished = True
dom.document.title = "Py.test [crashed]"
dom.document.getElementById("Tests").childNodes[0].nodeValue = "Tests [crashed]"
SCROLL_LINES = 50
def opt_scroll():
if opts.scroll:
opts.scroll = False
else:
opts.scroll = True
def scroll_down_if_needed(mbox):
if not opts.scroll:
return
#if dom.window.scrollMaxY - dom.window.scrollY < SCROLL_LINES:
mbox.parentNode.scrollIntoView()
def hide_messagebox():
mbox = dom.document.getElementById("messagebox")
while mbox.childNodes:
mbox.removeChild(mbox.childNodes[0])
def make_module_box(msg):
tr = create_elem("tr")
td = create_elem("td")
tr.appendChild(td)
td.appendChild(create_text_elem("%s[0/%s]" % (msg['itemname'],
msg['length'])))
max_items[msg['fullitemname']] = int(msg['length'])
short_item_names[msg['fullitemname']] = msg['itemname']
td.id = '_txt_' + msg['fullitemname']
#tr.setAttribute("id", msg['fullitemname'])
td.setAttribute("onmouseover",
"show_info('%s')" % (msg['fullitemname'],))
td.setAttribute("onmouseout", "hide_info()")
td2 = create_elem('td')
tr.appendChild(td2)
table = create_elem("table")
td2.appendChild(table)
tbody = create_elem('tbody')
tbody.id = msg['fullitemname']
table.appendChild(tbody)
counters[msg['fullitemname']] = 0
return tr
def add_received_item_outcome(msg, module_part):
if msg['hostkey']:
host_elem = dom.document.getElementById(msg['hostkey'])
glob.host_pending[msg['hostkey']].pop()
count = len(glob.host_pending[msg['hostkey']])
host_elem.childNodes[0].nodeValue = '%s[%s]' % (
glob.host_dict[msg['hostkey']], count)
td = create_elem("td")
td.setAttribute("onmouseover", "show_info('%s')" % (
msg['fullitemname'],))
td.setAttribute("onmouseout", "hide_info()")
item_name = msg['fullitemname']
# TODO: dispatch output
if msg["passed"] == 'True':
txt = create_text_elem(".")
td.appendChild(txt)
elif msg["skipped"] != 'None' and msg["skipped"] != "False":
exported_methods.show_skip(item_name, skip_come_back)
link = create_elem("a")
link.setAttribute("href", "javascript:show_skip('%s')" % (
msg['fullitemname'],))
txt = create_text_elem('s')
link.appendChild(txt)
td.appendChild(link)
else:
link = create_elem("a")
link.setAttribute("href", "javascript:show_traceback('%s')" % (
msg['fullitemname'],))
txt = create_text_elem('F')
link.setAttribute('class', 'error')
link.appendChild(txt)
td.appendChild(link)
exported_methods.show_fail(item_name, fail_come_back)
if counters[msg['fullmodulename']] == 0:
tr = create_elem("tr")
module_part.appendChild(tr)
name = msg['fullmodulename']
counters[name] += 1
counter_part = get_elem('_txt_' + name)
newcontent = "%s[%d/%d]" % (short_item_names[name], counters[name],
max_items[name])
counter_part.childNodes[0].nodeValue = newcontent
module_part.childNodes[-1].appendChild(td)
def process(msg):
if len(msg) == 0:
return False
elem = dom.document.getElementById("testmain")
#elem.innerHTML += '%s<br/>' % msg['event']
main_t = dom.document.getElementById("main_table")
if msg['type'] == 'ItemStart':
# we start a new directory or what
#if msg['itemtype'] == 'Module':
tr = make_module_box(msg)
main_t.appendChild(tr)
elif msg['type'] == 'SendItem':
host_elem = dom.document.getElementById(msg['hostkey'])
glob.host_pending[msg['hostkey']].insert(0, msg['fullitemname'])
count = len(glob.host_pending[msg['hostkey']])
host_elem.childNodes[0].nodeValue = '%s[%s]' % (
glob.host_dict[msg['hostkey']], count)
elif msg['type'] == 'HostRSyncRootReady':
host_elem = dom.document.getElementById(msg['hostkey'])
host_elem.style.background = \
"#00ff00"
host_elem.childNodes[0].nodeValue = '%s[0]' % (
glob.host_dict[msg['hostkey']],)
elif msg['type'] == 'ItemFinish':
module_part = get_elem(msg['fullmodulename'])
if not module_part:
glob.pending.append(msg)
return True
add_received_item_outcome(msg, module_part)
elif msg['type'] == 'TestTestrunFinish':
text = "FINISHED %s run, %s failures, %s skipped" % (msg['run'], msg['fails'], msg['skips'])
glob.finished = True
dom.document.title = "Py.test %s" % text
dom.document.getElementById("Tests").childNodes[0].nodeValue = \
"Tests [%s]" % text
elif msg['type'] == 'FailedTryiter':
module_part = get_elem(msg['fullitemname'])
if not module_part:
glob.pending.append(msg)
return True
tr = create_elem("tr")
td = create_elem("td")
a = create_elem("a")
a.setAttribute("href", "javascript:show_traceback('%s')" % (
msg['fullitemname'],))
txt = create_text_elem("- FAILED TO LOAD MODULE")
a.appendChild(txt)
td.appendChild(a)
tr.appendChild(td)
module_part.appendChild(tr)
item_name = msg['fullitemname']
exported_methods.show_fail(item_name, fail_come_back)
elif msg['type'] == 'DeselectedItem':
module_part = get_elem(msg['fullitemname'])
if not module_part:
glob.pending.append(msg)
return True
tr = create_elem("tr")
td = create_elem("td")
txt = create_text_elem("- skipped (%s)" % (msg['reason'],))
td.appendChild(txt)
tr.appendChild(td)
module_part.appendChild(tr)
elif msg['type'] == 'RsyncFinished':
glob.rsync_done = True
elif msg['type'] == 'InterruptedExecution':
show_interrupt()
elif msg['type'] == 'CrashedExecution':
show_crash()
if glob.data_empty:
mbox = dom.document.getElementById('messagebox')
scroll_down_if_needed(mbox)
return True
def show_skip(item_name="aa"):
set_msgbox(item_name, skips[item_name])
def set_msgbox(item_name, data):
msgbox = get_elem("messagebox")
while len(msgbox.childNodes):
msgbox.removeChild(msgbox.childNodes[0])
pre = create_elem("pre")
txt = create_text_elem(item_name + "\n" + data)
pre.appendChild(txt)
msgbox.appendChild(pre)
dom.window.location.assign("#message")
glob.data_empty = False
def show_traceback(item_name="aa"):
data = ("====== Traceback: =========\n%s\n======== Stdout: ========\n%s\n"
"========== Stderr: ==========\n%s\n" % tracebacks[item_name])
set_msgbox(item_name, data)
def fail_come_back(msg):
tracebacks[msg['item_name']] = (msg['traceback'], msg['stdout'],
msg['stderr'])
def skip_come_back(msg):
skips[msg['item_name']] = msg['reason']
def reshow_host():
if glob.host == "":
return
show_host(glob.host)
def show_host(host_name="aa"):
elem = dom.document.getElementById("jobs")
if elem.childNodes:
elem.removeChild(elem.childNodes[0])
tbody = create_elem("tbody")
for item in glob.host_pending[host_name]:
tr = create_elem("tr")
td = create_elem("td")
td.appendChild(create_text_elem(item))
tr.appendChild(td)
tbody.appendChild(tr)
elem.appendChild(tbody)
elem.style.visibility = "visible"
glob.host = host_name
dom.setTimeout(reshow_host, 100)
def hide_host():
elem = dom.document.getElementById("jobs")
while len(elem.childNodes):
elem.removeChild(elem.childNodes[0])
elem.style.visibility = "hidden"
glob.host = ""
def update_rsync():
if glob.finished:
return
elem = dom.document.getElementById("Tests")
if glob.rsync_done is True:
elem.childNodes[0].nodeValue = "Tests"
return
text = "Rsyncing" + '.' * glob.rsync_dots
glob.rsync_dots += 1
if glob.rsync_dots > 5:
glob.rsync_dots = 0
elem.childNodes[0].nodeValue = "Tests [%s]" % text
dom.setTimeout(update_rsync, 1000)
def host_init(host_dict):
tbody = dom.document.getElementById("hostsbody")
for host in host_dict.keys():
tr = create_elem('tr')
tbody.appendChild(tr)
td = create_elem("td")
td.style.background = "#ff0000"
txt = create_text_elem(host_dict[host])
td.appendChild(txt)
td.id = host
tr.appendChild(td)
td.setAttribute("onmouseover", "show_host('%s')" % host)
td.setAttribute("onmouseout", "hide_host()")
glob.rsync_dots = 0
glob.rsync_done = False
dom.setTimeout(update_rsync, 1000)
glob.host_dict = host_dict
glob.host_pending = {}
for key in host_dict.keys():
glob.host_pending[key] = []
def key_pressed(key):
if key.charCode == ord('s'):
scroll_box = dom.document.getElementById("opt_scroll")
if opts.scroll:
scroll_box.removeAttribute("checked")
opts.scroll = False
else:
scroll_box.setAttribute("checked", "true")
opts.scroll = True
def sessid_comeback(id):
glob.sessid = id
exported_methods.show_all_statuses(id, comeback)
def main():
glob.finished = False
exported_methods.show_hosts(host_init)
exported_methods.show_sessid(sessid_comeback)
dom.document.onkeypress = key_pressed
dom.document.getElementById("opt_scroll").setAttribute("checked", "True")