[svn r37598] removing broken tkinter session support ahead of the

0.9 release (might be re-introduced later)

--HG--
branch : trunk
This commit is contained in:
hpk 2007-01-30 15:02:28 +01:00
parent 1f0835df15
commit e96e4f61c7
19 changed files with 0 additions and 1844 deletions

View File

@ -150,8 +150,6 @@ class Config(object):
name = 'TerminalSession'
if self.option.dist:
name = 'RSession'
elif self.option.tkinter:
name = 'TkinterSession'
else:
optnames = 'startserver runbrowser apigen restreport boxing'.split()
for opt in optnames:
@ -253,8 +251,6 @@ config_per_process = Config()
# default import paths for sessions
TkinterSession = 'py.__.test.tkinter.reportsession'
TerminalSession = 'py.__.test.terminal.terminal'
TerminalSession = 'py.__.test.terminal.terminal'
RemoteTerminalSession = 'py.__.test.terminal.remote'
RSession = 'py.__.test.rsession.rsession'

View File

@ -86,9 +86,6 @@ def adddefaultoptions(config):
action="store_true", dest="runbrowser", default=False,
help="run browser (implies --startserver)."
),
Option('', '--tkinter',
action="store_true", dest="tkinter", default=False,
help="use tkinter test session frontend."),
Option('', '--box',
action="store_true", dest="boxing",
help="use boxing (running each test in external process)"),

View File

@ -224,11 +224,6 @@ class TestSessionAndOptions:
config = py.test.config._reparse([self.tmpdir, '--dist', '--exec=x'])
assert config._getsessionname() == 'RSession'
def test_tkintersession(self):
config = py.test.config._reparse([self.tmpdir, '--tkinter'])
assert config._getsessionname() == 'TkinterSession'
config = py.test.config._reparse([self.tmpdir, '--dist'])
def test_sessionname_lookup_custom(self):
self.tmpdir.join("conftest.py").write(py.code.Source("""
from py.__.test.session import Session

View File

@ -1,241 +0,0 @@
'''Backend for running tests and creating a Repository of testreports.'''
import py
import repository
import util
import threading
Null = util.Null
class TestRepository(repository.Repository):
'''stores only TestReport'''
ReportClass = util.TestReport
failed_id = [repr(ReportClass.Status.Failed())]
skipped_id = [repr(ReportClass.Status.Skipped())]
def __init__(self):
super(TestRepository, self).__init__()
self.add([], self.ReportClass())
failedreport = self.ReportClass()
failedreport.full_id = self.failed_id
failedreport.label = 'Failed Tests'
self.add(self.failed_id, failedreport)
skippedreport = self.ReportClass()
skippedreport.full_id = self.skipped_id
skippedreport.label = 'Skipped Tests'
self.add(self.skipped_id, skippedreport)
def root_status(self):
root_status = self.ReportClass.Status.NotExecuted()
if len(self.keys()) > 2:
root_status = self.ReportClass.Status.Passed()
if len(self.find_children(self.skipped_id)):
root_status = self.ReportClass.Status.Skipped()
if len(self.find_children(self.failed_id)):
root_status = self.ReportClass.Status.Failed()
return root_status
def delete_all(self, key):
super(TestRepository, self).delete_all(key)
new_repos = TestRepository()
for new_key in new_repos.keys():
if not self.haskey(new_key):
self.add(new_key, new_repos.find(new_key))
def delete(self, key):
super(TestRepository, self).delete(key)
new_repos = TestRepository()
if new_repos.haskey(key):
self.add(key, new_repos.find(key))
def add_report(self, report):
if not report.full_id:
self.add([], report)
return
if report.error_report:
if report.status == self.ReportClass.Status.Failed():
self.add(self.failed_id + [report.id], report)
elif report.status == self.ReportClass.Status.Skipped():
self.add(self.skipped_id + [report.id], report)
self.add(report.full_id, report)
def add_report_from_channel(self, report_str):
report = self.ReportClass.fromChannel(report_str)
self.add_report(report)
return report.full_id[:]
class ReportStore:
ReportClass = util.TestReport
def __init__(self):
self._reports = repository.OrderedDict()
self._repository = TestRepository()
self.root = self.ReportClass()
#self.add(self.ReportClass())
def add(self, report):
if not self._check_root(report):
#print '/'.join(report.full_id)
self._reports[report.full_id] = report
self._repository.add_report(report)
def _check_root(self, report):
if report.id == self.ReportClass.root_id:
self.root = report
return True
else:
self.root.status.update(report.status)
return False
def add_report_from_channel(self, report_str):
report = self.ReportClass.fromChannel(report_str)
self.add(report)
return report.full_id[:]
def get(self, **kwargs):
# ensure failed > skipped > passed_item
filter_dict = repository.OrderedDict()
filter_dict['failed'] = self._select_failed
filter_dict['skipped']= self._select_skipped
filter_dict['passed_item']= self._select_passed_item
filter_dict['id']= self._select_by_id
if kwargs.get('id', None) == tuple():
return [self.root]
selected_reports = []
functions = [filter_dict[name] for name in kwargs.keys()
if filter_dict.has_key(name)]
for function in functions:
selected_reports.extend([rep for rep in self._reports.values()
if function(rep, **kwargs)])
return selected_reports
def _select_failed(self, report, **kwargs):
if kwargs['failed']:
return report.status == self.ReportClass.Status.Failed() and report.error_report != ''
return False
def _select_skipped(self, report, **kwargs):
if kwargs['skipped']:
return report.status == self.ReportClass.Status.Skipped()
return False
def _select_passed_item(self, report, **kwargs):
if kwargs['passed_item']:
return report.status == self.ReportClass.Status.Passed() and report.is_item == True
return False
def _select_by_id(self, report, **kwargs):
id = kwargs['id']
return report.full_id == id
class ReportBackend:
def __init__(self, config = Null()):
self.reportstore = ReportStore()
self.channel = Null()
self.waitfinish_thread = Null()
self.queue = py.std.Queue.Queue()
self._message_callback = Null()
self._messages_callback = Null()
self.config = config
def running(self):
'''are there tests running?'''
if self.channel:
return not self.channel.isclosed()
return False
running = property(running)
def shutdown(self):
if self.running:
self.channel.close()
if self.waitfinish_thread.isAlive():
self.waitfinish_thread.join()
def get_store(self):
return self.reportstore
def set_message_callback(self, callback = Null()):
self._message_callback = callback
def set_messages_callback(self, callback = Null()):
self._messages_callback = callback
def update(self):
"""Check if there is something new in the queue."""
changed_report_ids = []
while not self.queue.empty():
try:
report_str = self.queue.get(False)
id = report_str
if report_str is not None:
id = self.reportstore.add_report_from_channel(report_str)
changed_report_ids.append(id)
self._message_callback(id)
except py.std.Queue.Empty:
pass
self._messages_callback(changed_report_ids)
def debug_queue_put(self, item):
report = ReportStore.ReportClass.fromChannel(item)
print '/'.join(report.full_id)
self.queue.put(item)
def start_tests(self, config = None, args = [], tests = []):
py.test.skip("XXX fix this or remove --tkinter")
if self.running:
return
if config is None:
config = self.config
self.testrepository = TestRepository()
self.reportstore = ReportStore()
self.gateway = py.execnet.PopenGateway(config.option.executable)
#self.channel = self.gateway.newchannel(receiver = self.queue.put)
self.channel = self.gateway.remote_exec(source = '''
import py
from py.__.test.tkinter.backend import remote
args, tests = channel.receive()
remote(channel, tests = tests, args = args)
# why?
channel.close()
''')
self.channel.setcallback(self.queue.put)
self.channel.send((args, tests))
self.waitfinish_thread = threading.Thread(target = waitfinish, args = (self.channel,))
self.waitfinish_thread.start()
def waitfinish(channel):
try:
while 1:
try:
channel.waitclose(0.5)
except (IOError, py.error.Error):
continue
break
finally:
try:
channel.gateway.exit()
except EOFError:
# the gateway receiver callback will get woken up
# and see an EOFError exception
pass
def remote(channel, tests = [], args = []):
import py
from py.__.test.tkinter.reportsession import ReportSession
from py.__.test.terminal.remote import getfailureitems
config = py.test.config._reparse(args)
if tests:
cols = getfailureitems(tests)
else:
cols = config.args
session = ReportSession(config = config, channel=channel)
session.shouldclose = channel.isclosed
session.main()

View File

@ -1,19 +0,0 @@
class Event:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.callbacks = []
def __call__(self, *args, **kwargs):
default_kwargs = kwargs.copy()
default_kwargs.update(kwargs)
for callable in self.callbacks:
callable(*args, **default_kwargs)
def subscribe(self, callable):
self.callbacks.append(callable)
def remove(self, callable):
while f in self.callbacks: self.callbacks.remove(f)

View File

@ -1,48 +0,0 @@
'''GuiSession builds TestReport instances and sends them to tkgui.Manager'''
import py
from util import TestReport
from py.__.test.session import Session
class TkinterSession(Session):
def __init__(self, config = None, channel = None):
super(ReportSession, self).__init__(config)
self.channel = channel
self.reportlist = []
def header(self, colitems):
super(ReportSession, self).header(colitems)
report = TestReport()
report.settime()
self.reportlist = [report]
self.sendreport(report)
def footer(self, colitems):
super(ReportSession, self).footer(colitems)
report = self.reportlist.pop()
report.settime()
self.sendreport(report)
self.channel.send(None)
def start(self, colitem):
super(ReportSession, self).start(colitem)
report = TestReport()
report.start(colitem)
self.reportlist.append(report)
self.sendreport(report)
def finish(self, colitem, outcome):
super(ReportSession, self).finish(colitem, outcome)
colitem.finishcapture()
colitem.finishcapture()
report = self.reportlist.pop()
report.finish(colitem, outcome, self.config)
self.reportlist[-1].status.update(report.status)
self.sendreport(report)
#py.std.time.sleep(0.5)
def sendreport(self, report):
self.channel.send(report.to_channel())
ReportSession = TkinterSession

View File

@ -1,154 +0,0 @@
import py
Item = py.test.Item
Collector = py.test.collect.Collector
import copy
import time
import UserDict
class Repository(object):
'''like a trie'''
nothing = object()
def __init__(self):
self.root = self.newnode()
def newnode(self):
return [self.nothing, OrderedDictMemo()]
def copy(self):
newrepos = Repository()
newrepos.root = copy.deepcopy(self.root)
return newrepos
def add(self, key, value):
node = self.root
for k in key:
node = node[1].setdefault(k, self.newnode())
node[0] = value
def find_tuple(self, key=[]):
node = self.root
for k in key:
node = node[1][k]
return node
def find(self, key=[]):
return self.find_tuple(key)[0]
def haskey(self, key):
try:
value = self.find(key)
except KeyError:
return False
return True
def haskeyandvalue(self, key):
if self.haskey(key):
value = self.find(key)
return value is not self.nothing
return False
def find_children(self, key=[]):
if self.haskey(key):
node = self.find_tuple(key)
return [list(key) + [childname] for childname in node[1].keys()]
return []
def keys(self, startkey=[]):
ret = []
for key in self.find_children(startkey):
ret.append(key)
ret.extend(self.keys(key))
return ret
def removestalekeys(self, key):
if self.find_children(key) == [] and not self.haskeyandvalue(key):
if len(key) > 0:
parent = self.find_tuple(key[:-1])
del parent[1][key[-1]]
self.removestalekeys(key[:-1])
def delete(self, key):
if self.haskeyandvalue(key):
node = self.find_tuple(key)
node[0] = self.newnode()[0]
self.removestalekeys(key)
def delete_all(self, key):
if self.haskeyandvalue(key):
node = self.find_tuple(key)
node[0], node[1] = self.newnode()[0], self.newnode()[1]
self.removestalekeys(key)
def values(self, startkey=[]):
return [self.find(key) for key in self.keys(startkey)]
def items(self, startkey=[]):
return [(key, self.find(key)) for key in self.keys(startkey)]
class OrderedDict(UserDict.DictMixin):
'''like a normal dict, but keys are ordered by time of setting'''
def __init__(self, *args, **kwargs):
self._dict = dict(*args, **kwargs)
self._keys = self._dict.keys()
def __getitem__(self, key):
return self._dict.__getitem__(key)
def __setitem__(self, key, value):
self._dict.__setitem__(key, value)
try:
self._keys.remove(key)
except ValueError:
pass
self._keys.append(key)
def __delitem__(self, key):
self._dict.__delitem__(key)
self._keys.remove(key)
def keys(self):
return self._keys[:]
def copy(self):
new = OrderedDict()
for key, value in self.iteritems():
new[key] = value
return new
class OrderedDictMemo(UserDict.DictMixin):
'''memorize all keys and how they were ordered'''
def __init__(self, *args, **kwargs):
self._dict = dict(*args, **kwargs)
self._keys = self._dict.keys()
def __getitem__(self, key):
return self._dict.__getitem__(key)
def __setitem__(self, key, value):
self._dict.__setitem__(key, value)
if key not in self._keys:
self._keys.append(key)
def __delitem__(self, key):
self._dict.__delitem__(key)
def keys(self):
return [key for key in self._keys if key in self._dict]
def copy(self):
new = OrderedDict()
for key, value in self.iteritems():
new[key] = value
return new

View File

@ -1,13 +0,0 @@
import py
def test_one():
assert 42 == 42
class TestClass(object):
def test_method_one(self):
assert 42 == 42
def test_print_and_fail():
print 'STDOUT',
print >>py.std.sys.stderr, 'STDERR',
py.test.fail()

View File

@ -1,210 +0,0 @@
import py
from py.__.test.tkinter import backend
ReportBackend = backend.ReportBackend
TestRepository = backend.TestRepository
ReportStore = backend.ReportStore
datadir = py.magic.autopath().dirpath('data')
from cStringIO import StringIO
class Test_TestRepository:
def check_repository_has_failed_and_skipped_folder(self, repos):
assert repos.find([repr(TestRepository.ReportClass.Status.Failed())])
assert repos.find([repr(TestRepository.ReportClass.Status.Skipped())])
def test_repository_has_failed_and_skipped_folder(self):
repos = TestRepository()
self.check_repository_has_failed_and_skipped_folder(repos)
def test_repository_has_failed_and_skipped_folder_after_delete_all(self):
repos = TestRepository()
self.check_repository_has_failed_and_skipped_folder(repos)
repos.delete_all([])
self.check_repository_has_failed_and_skipped_folder(repos)
def test_repository_has_failed_and_skipped_folder_after_delete(self):
repos = TestRepository()
self.check_repository_has_failed_and_skipped_folder(repos)
repos.delete([str(TestRepository.ReportClass.Status.Failed())])
self.check_repository_has_failed_and_skipped_folder(repos)
repos.delete([str(TestRepository.ReportClass.Status.Failed())])
self.check_repository_has_failed_and_skipped_folder(repos)
def test_add_report_from_channel(self):
full_id = ['start', 'next', 'end']
report = TestRepository.ReportClass()
report.full_id = full_id
repos = TestRepository()
id = repos.add_report_from_channel(report.to_channel())
assert id == full_id
assert repos.haskey(full_id)
class TestReportStore:
def setup_method(self, method):
self.store = ReportStore()
self.report_failed = ReportStore.ReportClass()
self.report_failed.status = ReportStore.ReportClass.Status.Failed()
self.report_failed.full_id = ('report_failed')
self.report_failed.id = '1'
self.report_failed_item = self.report_failed.copy()
self.report_failed_item.error_report = 'Error'
self.report_failed_item.full_id = ('report_failed_item')
self.report_failed_item.id = '1'
self.report_skipped = ReportStore.ReportClass()
self.report_skipped.status = ReportStore.ReportClass.Status.Skipped()
self.report_skipped.full_id = ('report_skipped')
self.report_skipped.id = '1'
self.report_passed = ReportStore.ReportClass()
self.report_passed.status = ReportStore.ReportClass.Status.Passed()
self.report_passed.full_id = ('report_passed')
self.report_passed.id = '1'
self.report_passed_item = self.report_passed.copy()
self.report_passed_item.is_item = True
self.report_passed_item.full_id = ('report_passed_item')
self.report_passed_item.id = '1'
def fill_store(self):
self.store.add(self.report_failed)
self.store.add(self.report_failed_item)
self.store.add(self.report_skipped)
self.store.add(self.report_passed)
self.store.add(self.report_passed_item)
def test_get_failed(self):
self.fill_store()
print self.store._reports.keys()
assert len(self.store.get(failed = True)) == 1
assert self.store.get(failed = True) == [self.report_failed_item]
def test_get_skipped(self):
self.fill_store()
print self.store._reports.keys()
assert len(self.store.get(skipped = True)) == 1
assert self.store.get(skipped = True) == [self.report_skipped]
def test_get_passed_item(self):
self.fill_store()
assert len(self.store.get(passed_item = True)) == 1
assert self.store.get(passed_item = True) == [self.report_passed_item]
def test_select_failed(self):
assert self.store._select_failed(self.report_failed,
failed = True) == False
assert self.store._select_failed(self.report_failed_item,
failed = True) == True
assert self.store._select_failed(self.report_failed_item,
failed=False) == False
assert self.store._select_failed(self.report_skipped,
failed = True,
skipped = True) == False
def test_select_skipped(self):
assert self.store._select_skipped(self.report_failed,
skipped = True) == False
assert self.store._select_skipped(self.report_skipped,
skipped = False) == False
assert self.store._select_skipped(self.report_skipped,
skipped = True) == True
def test_select_passed_item(self):
assert self.store._select_passed_item(self.report_failed,
passed_item = True) == False
assert self.store._select_passed_item(self.report_skipped,
passed_item = True,
skipped = True) == False
assert self.store._select_passed_item(self.report_passed,
passed_item = True) == False
assert self.store._select_passed_item(self.report_passed_item,
passed_item = False) == False
assert self.store._select_passed_item(self.report_passed_item,
passed_item = True) == True
def test_select_by_id(self):
assert self.store._select_by_id(self.report_passed_item,
id = ['id']) == False
id = ['my', 'special', 'report', 'id']
report = self.store.ReportClass()
report.full_id = id[:]
assert self.store._select_by_id(report, id = id) == True
assert self.store._select_by_id(report, id = id[:-1]) == False
def test_add_report_from_channel(self):
full_id = ['start', 'next', 'end']
report = ReportStore.ReportClass()
report.full_id = full_id
id = self.store.add_report_from_channel(report.to_channel())
assert id == full_id
class TestReportBackend:
def setup_method(self, method):
self.backend = ReportBackend()
def test_get_store(self):
assert isinstance(self.backend.get_store(), ReportStore)
def test_running_property(self):
backend = ReportBackend()
assert not self.backend.running
def test_update_callback(self):
l = []
self.backend.set_message_callback(l.append)
self.backend.queue.put(None)
self.backend.update()
assert len(l) == 1
assert l[0] is None
def test_processs_messeges_callback(self):
l = []
self.backend.set_message_callback(l.append)
self.backend.queue.put(None)
self.backend.update()
assert len(l) == 1
assert l[0] is None
def test_start_tests(self):
config = py.test.config._reparse([datadir/'filetest.py'])
self.backend.start_tests(config = config,
args = config.args,
tests = [])
while self.backend.running:
self.backend.update()
self.backend.update()
store = self.backend.get_store()
assert store._repository.find(['py',
'test',
'tkinter',
'testing',
'data',
'filetest.py',
'TestClass'])
def test_remote():
class ChannelMock:
def __init__(self):
self.sendlist = []
def send(self, value):
self.sendlist.append(value)
def isclosed(self):
return False
channel = ChannelMock()
backend.remote(channel, args = [str(datadir / 'filetest.py')], tests = [])
#py.std.pprint.pprint(channel.sendlist)
assert channel.sendlist

View File

@ -1,20 +0,0 @@
import py
from py.__.test.tkinter import backend
ReportBackend = backend.ReportBackend
datadir = py.magic.autopath().dirpath('data')
def test_capture_out_err():
config = py.test.config._reparse([datadir/'filetest.py'])
backend = ReportBackend()
backend.start_tests(config = config,
args = config.args,
tests = [])
while backend.running:
backend.update()
backend.update()
store = backend.get_store()
assert len(store.get(failed = True)) == 1
failed = store.get(failed = True)[0]
assert failed.stdout == 'STDOUT'
assert failed.stderr == 'STDERR'

View File

@ -1,62 +0,0 @@
import py
from py.__.test.tkinter import reportsession
from py.__.test.tkinter.util import Status, TestReport, Null
ReportSession = reportsession.ReportSession
class TestReportSession:
class ChannelMock:
def __init__(self, receinvelist = []):
self.reset(receinvelist)
def reset(self, receivelist = []):
self.receivelist = receivelist
self.sendlist = []
def send(self, obj):
self.sendlist.append(obj)
def receive(self):
return self.receivelist.pop(0)
def setup_method(self, method):
self.channel = self.ChannelMock()
self.session = ReportSession(Null(), self.channel)
self.collitems = [Null(), Null()]
def test_header_sends_report_with_id_root(self):
self.session.header(self.collitems)
assert self.channel.sendlist != []
report = TestReport.fromChannel(self.channel.sendlist[0])
assert report.status == Status.NotExecuted()
assert report.id == TestReport.root_id
assert report.label == 'Root'
def test_footer_sends_report_and_None(self):
self.session.header(self.collitems)
self.session.footer(self.collitems)
assert self.channel.sendlist != []
assert self.channel.sendlist[-1] is None
report = TestReport.fromChannel(self.channel.sendlist[-2])
assert report.status == Status.NotExecuted()
assert report.id == TestReport.root_id
## def test_status_is_passed_to_root(self):
## self.session.header(self.collitems)
## self.session.start(self.collitems[0])
## self.session.finish(self.collitems[0], py.test.collect.Collector.Failed())
## self.session.footer(self.collitems)
## assert self.channel.sendlist[-1] is None
## assert self.channel.sendlist.pop() is None
## report = TestReport.fromChannel(self.channel.sendlist[-1])
## assert report.name == 'Root'
## assert report.status == Status.Failed()

View File

@ -1,152 +0,0 @@
from py.__.test.tkinter.repository import Repository, OrderedDict, OrderedDictMemo
import py
Item = py.test.Item
import itertools
class TestRepository:
def setup_method(self, method):
self.rep = Repository()
def test_add_find_single_value(self):
key = ['key']
value = 'value'
self.rep.add(key, value)
assert self.rep.find(key) == value
def test_add_works_like_update(self):
key = 'k e y'.split()
value = 'value'
value2 = 'value2'
self.rep.add(key, value)
self.rep.add(key, value2)
assert self.rep.find(key) == value2
def test_haskeyandvalue(self):
key = 'first_middle_last'
value = 'value'
self.rep.add(key, value)
assert self.rep.haskeyandvalue(key)
assert not self.rep.haskeyandvalue('first')
for index in range(1, len(key[0])):
assert not self.rep.haskeyandvalue(key[0:index])
def test_add_find_subkey(self):
key = ('key', 'subkey')
value = 'subvalue'
self.rep.add(key, value)
self.rep.add((key[0],), 'value')
assert self.rep.find(key) == value
def test_find_raises_KeyError(self):
py.test.raises(KeyError, self.rep.find, 'nothing')
def test_haskey(self):
self.rep.add('key', 'value')
assert self.rep.haskey('key') == True
assert self.rep.haskey('katja') == False
assert self.rep.haskey('ke') == True
def test_find_children_empyt_repository(self):
assert self.rep.find_children() == []
def test_find_children(self):
self.rep.add(['c'], 'childvalue')
self.rep.add('c a'.split(), 'a')
self.rep.add('c b'.split(), 'b')
assert self.rep.find_children(['c']) == [ ['c','a'], ['c','b']]
assert self.rep.find_children() == [['c']]
def test_find_children_with_tuple_key(self):
key = tuple('k e y'.split())
value = 'value'
self.rep.add(key, value)
assert self.rep.find_children([]) == [['k']]
assert self.rep.find_children(('k', 'e')) == [['k', 'e', 'y']]
def test_keys(self):
keys = [ 'a b c'.split(), 'a b'.split(), ['a']]
for key in keys:
self.rep.add(key, 'value')
assert len(keys) == len(self.rep.keys())
for key in self.rep.keys():
assert key in keys
for key in keys:
assert key in self.rep.keys()
def test_delete_simple(self):
key = 'k'
value = 'value'
self.rep.add(key, value)
self.rep.delete(key)
assert self.rep.haskeyandvalue(key) == False
def test_removestallkeys_remove_all(self):
key = 'k e y'.split()
value = 'value'
self.rep.add(key, value)
node = self.rep.find_tuple(key)
node[0] = self.rep.newnode()[0]
self.rep.removestalekeys(key)
assert self.rep.keys() == []
def test_removestallkeys_dont_remove_parent(self):
key = 'k e y'.split()
key2 = 'k e y 2'.split()
value = 'value'
self.rep.add(key, value)
self.rep.add(key2, self.rep.newnode()[0])
self.rep.removestalekeys(key2)
assert self.rep.haskey(key2) == False
assert self.rep.haskeyandvalue(key)
def test_removestallkeys_works_with_parameter_root(self):
self.rep.removestalekeys([])
def test_copy(self):
key = 'k e y'.split()
key2 = 'k e y 2'.split()
value = 'value'
self.rep.add(key, value)
self.rep.add(key2, value)
newrep = self.rep.copy()
assert newrep.root is not self.rep.root
assert newrep.find(key) == self.rep.find(key)
class TestOrderedDict:
def setup_method(self, method):
self.dict = OrderedDict()
def test_add(self):
self.dict['key'] = 'value'
assert 'key' in self.dict
def test_order(self):
keys = range(3)
for k in keys:
self.dict[k] = str(k)
assert keys == self.dict.keys()
class TestOrderedDictMemo(TestOrderedDict):
def setup_method(self, method):
self.dict = OrderedDictMemo()
def test_insert(self):
self.dict['key1'] = 1
self.dict['key2'] = 2
del self.dict['key1']
self.dict['key1'] = 1
assert self.dict.keys() == ['key1', 'key2']

View File

@ -1,114 +0,0 @@
from __future__ import generators
from py.__.test.tkinter import util
from py.__.test.tkinter.util import Status, TestReport, OutBuffer
import py
Item = py.test.Item
class TestStatus:
def test_init_with_None(self):
status = Status(None)
assert status == status.NotExecuted()
def test_str(self):
status = Status(Item.Passed())
assert status == status.Passed()
status = Status(Item.Failed())
assert status == status.Failed()
status = Status(Item.Skipped())
assert status == status.Skipped()
def test_init_with_bad_name(self):
status = Status('nothing')
assert status == Status.NotExecuted()
def test_init_with_good_name(self):
def check_str(obj, expected):
assert str(obj) == expected
for name in Status.ordered_list:
yield check_str, Status(name), name
def test_update(self):
failed = Status.Failed()
passed = Status.Passed()
failed.update(passed)
assert failed == Status.Failed()
passed.update(failed)
assert passed == Status.Failed()
assert passed == failed
def test_eq_(self):
passed = Status.Passed()
assert passed == passed
assert passed == Status.Passed()
failed = Status.Failed()
assert failed != passed
class TestTestReport:
def setup_method(self, method):
self.path = py.path.local()
self.collector = py.test.collect.Directory(self.path)
self.testresult = TestReport()
def test_start(self):
self.testresult.start(self.collector)
assert self.testresult.full_id == tuple(self.collector.listnames())
assert self.testresult.time != 0
assert self.testresult.status == Status.NotExecuted()
def test_finish(self):
self.testresult.start(self.collector)
py.std.time.sleep(1.1)
self.testresult.finish(self.collector, None)
assert self.testresult.time > 1
assert self.testresult.status == Status.NotExecuted()
def test_toChannel_fromChannel(self):
assert isinstance(self.testresult.to_channel()['status'], str)
result = TestReport.fromChannel(self.testresult.to_channel())
assert isinstance(result.status, Status)
def test_copy(self):
result2 = self.testresult.copy()
assert self.testresult.status == Status.NotExecuted()
for key in TestReport.template.keys():
assert getattr(result2, key) == getattr(self.testresult, key)
self.testresult.status = Status.Failed()
assert result2.status != self.testresult.status
def test_is_item_attribute(self):
self.testresult.start(py.test.Item('test_is_item_attribute item',
parent = self.collector))
assert self.testresult.is_item == True
self.testresult.start(self.collector)
assert self.testresult.is_item == False
class Test_OutBuffer:
def setup_method(self, method):
self.out = OutBuffer()
def test_line(self):
oneline = 'oneline'
self.out.line(oneline)
assert self.out.getoutput() == oneline + '\n'
def test_write(self):
item = 'item'
self.out.write(item)
assert self.out.getoutput() == item

View File

@ -1,494 +0,0 @@
import py
from py.__.test.tkinter import backend
from py.__.test.tkinter import util
from py.__.test.tkinter import event
Null = util.Null
Event = event.Event
import ScrolledText
from Tkinter import PhotoImage
import Tkinter
import re
import os
myfont = ('Helvetica', 12, 'normal')
class StatusBar(Tkinter.Frame):
font = ('Helvetica', 14, 'normal')
def __init__(self, master=None, **kw):
if master is None:
master = Tk()
Tkinter.Frame.__init__(self, master, **kw)
self.labels = {}
self.callback = Null()
self.tkvariable_dict = {'failed': Tkinter.BooleanVar(),
'skipped': Tkinter.BooleanVar(),
'passed_item': Tkinter.BooleanVar()}
self.init_widgets()
def init_widgets(self):
def get_button(text, var):
b = Tkinter.Checkbutton(self, text=text,
variable=var,
onvalue=True, offvalue=False,
anchor=Tkinter.W,
font=self.font, indicatoron = False,
command = self.do_callback)
b.select()
return b
self.add_widget('Failed',
get_button('Failed', self.tkvariable_dict['failed']))
self._get_widget('Failed').configure(bg = 'Red3', fg = 'White',
highlightcolor = 'Red',
selectcolor = 'Red',
activebackground= 'Red',
activeforeground = 'White')
self.add_widget('Skipped',
get_button('Skipped', self.tkvariable_dict['skipped']))
self._get_widget('Skipped').configure(bg = 'Yellow3',
highlightcolor = 'Yellow',
selectcolor = 'Yellow',
activebackground= 'Yellow')
b = get_button('Passed', self.tkvariable_dict['passed_item'])
b.deselect()
self.add_widget('Passed', b)
self._get_widget('Passed').configure(bg = 'Green3', fg = 'Black',
highlightcolor = 'Black',
selectcolor = 'Green',
activeforeground = 'Black',
activebackground= 'Green')
def set_filter_args_callback(self, callback):
self.callback = callback
def do_callback(self,):
kwargs = dict([(key, var.get())
for key, var in self.tkvariable_dict.items()])
self.callback(**kwargs)
def set_label(self, name, text='', side=Tkinter.LEFT):
if not self.labels.has_key(name):
label = Tkinter.Label(self, bd=1, relief=Tkinter.SUNKEN,
anchor=Tkinter.W,
font=self.font)
self.add_widget(name, label, side)
else:
label = self.labels[name]
label.config(text='%s:\t%s' % (name,text))
def add_widget(self, name, widget, side=Tkinter.LEFT):
widget.pack(side=side)
self.labels[name] = widget
def _get_widget(self, name):
if not self.labels.has_key(name):
self.set_label(name, 'NoText')
return self.labels[name]
def update_all(self, reportstore = Null(), ids = []):
self.set_label('Failed', str(len(reportstore.get(failed = True))))
self.set_label('Skipped', str(len(reportstore.get(skipped = True))))
self.set_label('Passed',
str(len(reportstore.get(passed_item = True))))
root_report = reportstore.root
if root_report.time < 7*24*60*60:
self.set_label('Time', '%0.2f seconds' % root_report.time)
else:
self.set_label('Time', '%0.2f seconds' % 0.0)
class ReportListBox(Tkinter.LabelFrame):
font = myfont
def __init__(self, *args, **kwargs):
Tkinter.LabelFrame.__init__(self, *args, **kwargs)
self.callback = Null()
self.data = {}
self.filter_kwargs = {'skipped': True, 'failed': True}
self.label = Tkinter.Label(self)
self.label.configure(font = self.font, width = 80, anchor = Tkinter.W)
self.configure(labelwidget=self.label)
self.createwidgets()
self.label.configure(text = 'Idle')
self.configure(font = myfont)
def createwidgets(self):
self.listbox = Tkinter.Listbox(self, foreground='red',
selectmode=Tkinter.SINGLE, font = self.font)
self.scrollbar = Tkinter.Scrollbar(self, command=self.listbox.yview)
self.scrollbar.pack(side = Tkinter.RIGHT, fill = Tkinter.Y,
anchor = Tkinter.N)
self.listbox.pack(side = Tkinter.LEFT,
fill = Tkinter.BOTH, expand = Tkinter.YES,
anchor = Tkinter.NW)
self.listbox.configure(yscrollcommand = self.scrollbar.set,
bg = 'White',selectbackground= 'Red',
takefocus= Tkinter.YES)
def set_callback(self, callback):
self.callback = callback
self.listbox.bind('<Double-1>', self.do_callback)
def do_callback(self, *args):
report_ids = [self.data[self.listbox.get(int(item))]
for item in self.listbox.curselection()]
for report_id in report_ids:
self.callback(report_id)
def set_filter_kwargs(self, **kwargs):
self.filter_kwargs = kwargs
def update_label(self, report):
label = report.path
if not label:
label = 'Idle'
self.label.configure(text = label)
def update_list(self, reportstore):
reports = reportstore.get(**self.filter_kwargs)
old_selections = [self.listbox.get(int(sel))
for sel in self.listbox.curselection()]
self.listbox.delete(0, Tkinter.END)
self.data = {}
for report in reports:
label = '%s: %s' % (report.status, report.label)
self.data[label] = report.full_id
self.listbox.insert(Tkinter.END, label)
if label in old_selections:
self.listbox.select_set(Tkinter.END)
self.listbox.see(Tkinter.END)
class LabelEntry(Tkinter.Frame):
font = myfont
def __init__(self, *args, **kwargs):
Tkinter.Frame.__init__(self, *args, **kwargs)
self.label = Tkinter.Label(self)
self.label.configure(font = self.font)
self.label.pack(side = Tkinter.LEFT)
self.entry = Tkinter.Entry(self)
self.entry.configure(font = self.font)
self.entry.pack(side = Tkinter.LEFT, expand = Tkinter.YES,
fill = Tkinter.X)
def update(self, reportstore = Null(), ids = []):
bgcolor = 'White'
fgcolor = 'Black'
root_status = reportstore.root.status
if root_status == reportstore.ReportClass.Status.Passed():
bgcolor = 'Green'
elif root_status == reportstore.ReportClass.Status.Failed():
bgcolor = 'Red'
fgcolor = 'White'
elif root_status == reportstore.ReportClass.Status.Skipped() :
bgcolor = 'Yellow'
self.entry.configure(bg = bgcolor, fg = fgcolor)
class ReportFrame(Tkinter.LabelFrame):
font = myfont
def __init__(self, *args, **kwargs):
Tkinter.LabelFrame.__init__(self, *args, **kwargs)
self.report = Null()
self.label = Tkinter.Label(self,foreground="red", justify=Tkinter.LEFT)
self.label.pack(anchor=Tkinter.W)
self.label.configure(font = self.font)
self.configure(labelwidget = self.label)
self.text = ScrolledText.ScrolledText(self)
self.text.configure(bg = 'White', font = ('Helvetica', 11, 'normal'))
self.text.tag_config('sel', relief=Tkinter.FLAT)
self.text.pack(expand=Tkinter.YES, fill=Tkinter.BOTH,
side = Tkinter.TOP)
def set_report(self, report):
self.report = report
self.label.configure(text ='%s: %s' % (self.report.status,
self.report.label),
foreground="red", justify=Tkinter.LEFT)
self.text['state'] = Tkinter.NORMAL
self.text.delete(1.0, Tkinter.END)
self.text.insert(Tkinter.END, self.report.error_report)
self.text.yview_pickplace(Tkinter.END)
self.text['state'] = Tkinter.DISABLED
self.attacheditorhotspots(self.text)
def clear(self):
self.label.configure(text = '')
self.text['state'] = Tkinter.NORMAL
self.text.delete(1.0, Tkinter.END)
self.text['state'] = Tkinter.DISABLED
def launch_editor(self, file, line):
editor = (py.std.os.environ.get('PYTHON_EDITOR', None) or
py.std.os.environ.get('EDITOR_REMOTE', None) or
os.environ.get('EDITOR', None) or "emacsclient --no-wait ")
if editor:
print "%s +%s %s" % (editor, line, file)
#py.process.cmdexec('%s +%s %s' % (editor, line, file))
os.system('%s +%s %s' % (editor, line, file))
def attacheditorhotspots(self, text):
# Attach clickable regions to a Text widget.
filelink = re.compile(r"""\[(?:testcode\s*:)?\s*(.+):(\d+)\]""")
skippedlink = re.compile(r"""in\s+(/.*):(\d+)\s+""")
lines = text.get('1.0', Tkinter.END).splitlines(1)
if not lines:
return
tagname = ''
start, end = 0,0
for index, line in enumerate(lines):
match = filelink.search(line)
if match is None:
match = skippedlink.search(line)
if match is None:
continue
file, line = match.group(1, 2)
start, end = match.span()
tagname = "ref%d" % index
text.tag_add(tagname,
"%d.%d" % (index + 1, start),
"%d.%d" % (index + 1, end))
text.tag_bind(tagname, "<Enter>",
lambda e, n=tagname:
e.widget.tag_config(n, underline=1))
text.tag_bind(tagname, "<Leave>",
lambda e, n=tagname:
e.widget.tag_config(n, underline=0))
text.tag_bind(tagname, "<Button-1>",
lambda e, self=self, f=file, l=line:
self.launch_editor(f, l))
class MinimalButton(Tkinter.Button):
format_string = '%dF / %dS / %dP'
def __init__(self, *args, **kwargs):
Tkinter.Button.__init__(self, *args, **kwargs)
self.configure(text = 'Start/Stop') #self.format_string % (0,0,0))
def update_label(self, reportstore):
failed = len(reportstore.get(failed = True))
skipped = len(reportstore.get(skipped = True))
passed = len(reportstore.get(passed_item = True))
#self.configure(text = self.format_string % (failed, skipped, passed))
bgcolor = 'Green'
fgcolor = 'Black'
highlightcolor = 'Green',
activebackground= 'Green',
activeforeground = 'Black'
if skipped > 0:
bgcolor = 'Yellow'
activebackground = 'Yellow'
if failed > 0:
bgcolor = 'Red'
fgcolor = 'White'
activebackground= 'Red',
activeforeground = 'White'
self.configure(bg = bgcolor, fg = fgcolor,
highlightcolor = highlightcolor,
activeforeground = activeforeground,
activebackground = activebackground)
class LayoutManager:
def __init__(self):
self.current_layout = 0
self.layout_dict = {}
def add(self, widget, layout_ids, **kwargs):
for id in layout_ids:
list = self.layout_dict.setdefault(id, [])
list.append((widget, kwargs))
def set_layout(self, layout = 0):
if self.current_layout != layout:
widget_list = self.layout_dict.get(self.current_layout, [])
for widget, kwargs in widget_list:
widget.pack_forget()
self.current_layout = layout
widget_list = self.layout_dict.get(self.current_layout, [])
for widget, kwargs in widget_list:
widget.pack(**kwargs)
class TkGui:
font = ('Helvetica', 9, 'normal')
def __init__(self, parent, config):
self._parent = parent
self._config = config
self._should_stop = False
#XXX needed?
self._paths = []
self.backend = backend.ReportBackend(config)
self._layoutmanager = LayoutManager()
self._layout_toggle_var = Tkinter.StringVar()
self._layout_toggle_var.set('<')
# events
self.on_report_updated = Event()
self.on_report_store_updated = Event()
self.on_show_report = Event()
self.on_run_tests = Event()
self.createwidgets()
self.timer_update()
self.report_window = Null()
self.report_frame = Null()
def createwidgets(self):
add = self._layoutmanager.add
self._buttonframe = Tkinter.Frame(self._parent)
self._layoutbutton = Tkinter.Button(self._buttonframe, padx=0,
relief=Tkinter.GROOVE,
textvariable=self._layout_toggle_var,
command=self.toggle_layout)
add(self._layoutbutton, [0,1], side= Tkinter.LEFT)
self._entry = LabelEntry(self._buttonframe)
self._entry.label.configure(text = 'Enter test name:')
self._entry.entry.bind('<Return>', self.start_tests)
add(self._entry, [0], side = Tkinter.LEFT, fill = Tkinter.X,
expand = Tkinter.YES)
self._run_stop_button = Tkinter.Button(self._buttonframe, text = 'Run',
command = self.toggle_action, font = self.font)
add(self._run_stop_button, [0], side = Tkinter.RIGHT)
self._minimalbutton = MinimalButton(self._buttonframe, font = self.font,
command = self.toggle_action)
add(self._minimalbutton, [1], side = Tkinter.RIGHT,
fill = Tkinter.X, expand = Tkinter.YES)
add(self._buttonframe, [0,1], side = Tkinter.TOP, fill = Tkinter.X)
self._statusbar = StatusBar(self._parent)
self._statusbar.set_filter_args_callback(self.filter_args_changed)
add(self._statusbar, [0,1], side= Tkinter.BOTTOM, fill=Tkinter.X)
self._reportlist = ReportListBox(self._parent)
add(self._reportlist, [0], side = Tkinter.TOP, fill = Tkinter.BOTH,
expand = Tkinter.YES)
self._reportlist.set_callback(self.show_report)
self._report_frame = ReportFrame(self._parent)
add(self._report_frame, [0], side = Tkinter.TOP, fill = Tkinter.BOTH,
expand = Tkinter.YES)
#self.on_show_report.subscribe(self.show_report)
self.on_report_updated.subscribe(self._reportlist.update_label)
self.on_report_store_updated.subscribe(self._reportlist.update_list)
self.on_report_store_updated.subscribe(self._minimalbutton.update_label)
self.on_report_store_updated.subscribe(self.update_status)
self._layoutmanager.set_layout(0)
self.update_status(self.backend.get_store())
self.messages_callback(None)
def toggle_layout(self):
if self._layout_toggle_var.get() == '>':
self._layout_toggle_var.set('<')
self._layoutmanager.set_layout(0)
else:
self._layout_toggle_var.set('>')
self._layoutmanager.set_layout(1)
self._parent.geometry('')
def filter_args_changed(self, **kwargs):
self._reportlist.set_filter_kwargs(**kwargs)
self._reportlist.update_list(self.backend.get_store())
def show_report(self, report_id):
if report_id is None:
self._report_frame.clear()
else:
report = self.backend.get_store().get(id = report_id)[0]
self._report_frame.set_report(report)
def show_error_window(self, report_id):
report = self.backend.get_store().get(id = report_id)[0]
if not self.report_window:
self.report_window = Tkinter.Toplevel(self._parent)
self.report_window.protocol('WM_DELETE_WINDOW',
self.report_window.withdraw)
b = Tkinter.Button(self.report_window, text="Close",
command=self.report_window.withdraw)
b.pack(side=Tkinter.BOTTOM)
#b.focus_set()
self.report_frame = ReportFrame(self.report_window)
self.report_frame.pack()
elif self.report_window.state() != Tkinter.NORMAL:
self.report_window.deiconify()
self.report_window.title(report.label)
self.report_frame.set_report(report)
def timer_update(self):
self.backend.update()
if self.backend.running:
action = 'Stop'
else:
action = 'Run'
self._run_stop_button.configure(text = action)
self._minimalbutton.configure(text = action)
self._parent.after(200, self.timer_update)
def start_tests(self, dont_care_event=None):
if self.backend.running:
return
paths = [path.strip() for path in self._entry.entry.get().split(',')]
self.backend.set_messages_callback(self.messages_callback)
self.backend.set_message_callback(self.message_callback)
self.backend.start_tests(args = paths)
self.show_report(None)
def update_status(self, reportstore):
self._statusbar.update_all(reportstore = reportstore, ids = [])
self._entry.update(reportstore = reportstore, ids = [])
def messages_callback(self, report_ids):
if not report_ids:
return
self.on_report_store_updated(self.backend.get_store())
def message_callback(self, report_id):
if report_id is None:
report = Null()
else:
report = self.backend.get_store().get(id = report_id)[0]
self.on_report_updated(report)
def set_paths(self, paths):
self._paths = paths
self._entry.entry.insert(Tkinter.END, ', '.join(paths))
def toggle_action(self):
if self.backend.running:
self.stop()
else:
self.start_tests()
def stop(self):
self.backend.shutdown()
self.backend.update()
self.messages_callback([None])
self.message_callback(None)
def shutdown(self):
self.should_stop = True
self.backend.shutdown()
py.std.sys.exit()

View File

@ -1,14 +0,0 @@
import py
class TkSession:
def __init__(self, config):
self.config = config
def main(self, paths):
import Tkinter
root = Tkinter.Tk()
from tkgui import TkGui
tkgui = TkGui(root, self.config)
tkgui.set_paths(paths)
root.protocol('WM_DELETE_WINDOW', tkgui.shutdown)
root.mainloop()

View File

@ -1,291 +0,0 @@
'''some classes to handle text reports'''
import sys
import py
from py.__.test.terminal import out
from py.__.test.terminal.terminal import TerminalSession
class Null:
""" Null objects always and reliably "do nothing." """
def __init__(self, *args, **kwargs): pass
def __call__(self, *args, **kwargs): return self
def __repr__(self): return "Null()"
def __str__(self): return repr(self) + ' with id:' + str(id(self))
def __nonzero__(self): return 0
def __getattr__(self, name): return self
def __setattr__(self, name, value): return self
def __delattr__(self, name): return self
_NotExecuted = 'NotExecuted'
_Passed = 'Passed'
_Failed = 'Failed'
_Skipped = 'Skipped'
_ExceptionFailure = 'ExceptionFailure'
class Status(object):
'''Represents py.test.Collector.Outcome as a string.
Possible values: NotExecuted, Passed, Skipped, Failed, ExceptionFailure'''
def NotExecuted(cls):
return cls('NotExecuted')
NotExecuted = classmethod(NotExecuted)
def Passed(cls):
return cls('Passed')
Passed = classmethod(Passed)
def Failed(cls):
return cls('Failed')
Failed = classmethod(Failed)
def Skipped(cls):
return cls('Skipped')
Skipped = classmethod(Skipped)
def ExceptionFailure(cls):
return cls(_ExceptionFailure)
ExceptionFailure = classmethod(ExceptionFailure)
ordered_list = [_NotExecuted,
_Passed,
_Skipped,
_Failed,
_ExceptionFailure]
namemap = {
py.test.Item.Passed: _Passed,
py.test.Item.Skipped: _Skipped,
py.test.Item.Failed: _Failed,
}
def __init__(self, outcome_or_name = ''):
self.str = _NotExecuted
if isinstance(outcome_or_name, py.test.Item.Outcome):
for restype, name in self.namemap.items():
if isinstance(outcome_or_name, restype):
self.str = name
else:
if str(outcome_or_name) in self.ordered_list:
self.str = str(outcome_or_name)
def __repr__(self):
return 'Status("%s")' % self.str
def __str__(self):
return self.str
def update(self, status):
'''merge self and status, self will be set to the "higher" status
in ordered_list'''
name_int_map = dict(zip(self.ordered_list, range(len(self.ordered_list))))
self.str = self.ordered_list[max([name_int_map[i]
for i in (str(status), self.str)])]
def __eq__(self, other):
return self.str == other.str
def __ne__(self, other):
return not self.__eq__(other)
class OutBuffer(out.Out):
'''Simple MockObject for py.__.test.report.text.out.Out.
Used to get the output of TerminalSession.'''
def __init__(self, fullwidth = 80 -1):
self.output = []
self.fullwidth = fullwidth
def line(self, s= ''):
self.output.append(str(s) + '\n')
def write(self, s):
self.output.append(str(s))
def getoutput(self):
return ''.join(self.output)
def rewrite(self, s=''):
self.write(s)
class TestReport(object):
'''"Channel-save" report of a py.test.Collector.Outcome instance'''
root_id = 'TestReport Root ID'
template = {'time' : 0,
'label': 'Root',
'id': root_id,
'full_id': tuple(),
'status': Status.NotExecuted(),
'report': 'NoReport',
'error_report': '',
'finished': False,
'restart_params': None, # ('',('',))
'path' : '',
'modpath': '',
'is_item': False,
'stdout': '',
'stderr': '',
}
Status = Status
def fromChannel(cls, kwdict):
''' TestReport.fromChannel(report.toChannel()) == report '''
if 'status' in kwdict:
kwdict['status'] = Status(kwdict['status'])
return cls(**kwdict)
fromChannel = classmethod(fromChannel)
def __init__(self, **kwargs):
# copy status -> deepcopy
kwdict = py.std.copy.deepcopy(self.template)
kwdict.update(kwargs)
for key, value in kwdict.iteritems():
setattr(self, key, value)
def start(self, collector):
'''Session.start should call this to init the report'''
self.full_id = tuple(collector.listnames())
self.id = collector.name
if collector.getpathlineno(): # save for Null() in test_util.py
fspath, lineno = collector.getpathlineno()
if lineno != sys.maxint:
str_append = ' [%s:%s]' % (fspath.basename, lineno)
else:
str_append = ' [%s]' % fspath.basename
self.label = collector.name + str_append
self.path = '/'.join(collector.listnames())
#self.modpath = collector.getmodpath()
self.settime()
self.restart_params = (str(collector.listchain()[0].fspath),
collector.listnames())
self.status = Status.NotExecuted()
self.is_item = isinstance(collector, py.test.Item)
def finish(self, collector, res, config = Null()):
'''Session.finish should call this to set the
value of error_report
option is passed to Session at initialization'''
self.settime()
if collector.getpathlineno(): # save for Null() in test_util.py
fspath, lineno = collector.getpathlineno()
if lineno != sys.maxint:
str_append = ' [%s:%s] %0.2fsecs' % (fspath.basename,
lineno, self.time)
else:
str_append = ' [%s] %0.2fsecs' % (fspath.basename, self.time)
self.label = collector.name + str_append
if res:
if Status(res) == Status.Failed():
self.error_report = self.report_failed(config, collector, res)
elif Status(res) == Status.Skipped():
self.error_report = self.report_skipped(config, collector, res)
self.status.update(Status(res))
out, err = collector.getouterr()
self.stdout, self.stderr = str(out), str(err)
self.finished = True
def abbrev_path(self, fspath):
parts = fspath.parts()
basename = parts.pop().basename
while parts and parts[-1].basename in ('testing', 'test'):
parts.pop()
base = parts[-1].basename
if len(base) < 13:
base = base + "_" * (13-len(base))
return base + "_" + basename
def report_failed(self, config, item, res):
#XXX hack abuse of TerminalSession
terminal = TerminalSession(config)
out = OutBuffer()
terminal.out = out
terminal.repr_failure(item, res)
#terminal.repr_out_err(item)
return out.getoutput()
def report_skipped(self, config, item, outcome):
texts = {}
terminal = TerminalSession(config)
raisingtb = terminal.getlastvisible(outcome.excinfo.traceback)
fn = raisingtb.frame.code.path
lineno = raisingtb.lineno
d = texts.setdefault(outcome.excinfo.exconly(), {})
d[(fn, lineno)] = outcome
out = OutBuffer()
out.sep('_', 'reasons for skipped tests')
for text, dict in texts.items():
for (fn, lineno), outcome in dict.items():
out.line('Skipped in %s:%d' %(fn, lineno))
out.line("reason: %s" % text)
return out.getoutput()
def settime(self):
'''update self.time '''
self.time = py.std.time.time() - self.time
def to_channel(self):
'''counterpart of classmethod fromChannel'''
ret = self.template.copy()
for key in ret.keys():
ret[key] = getattr(self, key, self.template[key])
ret['status'] = str(ret['status'])
return ret
def __str__(self):
return str(self.to_channel())
def __repr__(self):
return str(self)
def copy(self, **kwargs):
channel_dict = self.to_channel()
channel_dict.update(kwargs)
return TestReport.fromChannel(channel_dict)
class TestFileWatcher:
'''watches files or paths'''
def __init__(self, *paths):
self.paths = [py.path.local(path) for path in paths]
self.watchdict = dict()
def file_information(self, path):
try:
return path.stat().st_ctime
except:
return None
def check_files(self):
'''returns (changed files, deleted files)'''
def fil(p):
return p.check(fnmatch='[!.]*.py')
def rec(p):
return p.check(dotfile=0)
files = []
for path in self.paths:
if path.check(file=1):
files.append(path)
else:
files.extend(path.visit(fil, rec))
newdict = dict(zip(files, [self.file_information(p) for p in files]))
files_deleted = [f for f in self.watchdict.keys()
if not newdict.has_key(f)]
files_new = [f for f in newdict.keys() if not self.watchdict.has_key(f)]
files_changed = [f for f in newdict.keys() if self.watchdict.has_key(f)
and newdict[f]!= self.watchdict[f]]
files_changed = files_new + files_changed
self.watchdict = newdict
return files_changed, files_deleted
def changed(self):
'''returns False if nothing changed'''
changed, deleted = self.check_files()
return changed != [] or deleted != []