diff --git a/py/test/plugin/pytest_resultdb.py b/py/test/plugin/pytest_resultdb.py new file mode 100644 index 000000000..910303d3c --- /dev/null +++ b/py/test/plugin/pytest_resultdb.py @@ -0,0 +1,392 @@ +import uuid +import py +from pytest_resultlog import generic_path, getoutcomecodes + +class ResultdbPlugin: + """resultdb plugin for database logging of test results. + + Saves test results to a datastore. + + Also mixes in some early ideas about an archive abstraction for test + results. + """ + def pytest_addoption(self, parser): + group = parser.addgroup("resultdb", "resultdb plugin options") + group.addoption('--resultdb', action="store", dest="resultdb", + metavar="path", + help="path to the file to store test results.") + group.addoption('--resultdb_format', action="store", + dest="resultdbformat", default='json', + help="data format (json, sqlite)") + + def pytest_configure(self, config): + if config.getvalue('resultdb'): + if config.option.resultdb: + # local import so missing module won't crash py.test + try: + import sqlite3 + except ImportError: + raise config.Error('Could not import sqlite3 module') + try: + import simplejson + except ImportError: + raise config.Error('Could not import simplejson module') + if config.option.resultdbformat.lower() == 'json': + self.resultdb = ResultDB(JSONResultArchive, + config.option.resultdb) + elif config.option.resultdbformat.lower() == 'sqlite': + self.resultdb = ResultDB(SQLiteResultArchive, + config.option.resultdb) + else: + raise config.Error('Unknown --resultdb_format: %s' % + config.option.resultdbformat) + + config.bus.register(self.resultdb) + + def pytest_unconfigure(self, config): + if hasattr(self, 'resultdb'): + del self.resultdb + #config.bus.unregister(self.resultdb) + + +class JSONResultArchive(object): + def __init__(self, archive_path): + self.archive_path = archive_path + import simplejson + self.simplejson = simplejson + + def init_db(self): + if os.path.exists(self.archive_path): + data_file = open(self.archive_path) + archive = self.simplejson.load(data_file) + self.archive = archive + else: + self.archive = [] + self._flush() + + def append_data(self, data): + runid = uuid.uuid4() + for item in data: + item = item.copy() + item['runid'] = str(runid) + self.archive.append(item) + self._flush() + + def get_all_data(self): + return self.archive + + def _flush(self): + data_file = open(self.archive_path, 'w') + self.simplejson.dump(self.archive, data_file) + data_file.close() + + +class SQLiteResultArchive(object): + def __init__(self, archive_path): + self.archive_path = archive_path + import sqlite3 + self.sqlite3 = sqlite3 + + def init_db(self): + if not os.path.exists(self.archive_path): + conn = self.sqlite3.connect(self.archive_path) + cursor = conn.cursor() + try: + cursor.execute(SQL_CREATE_TABLES) + conn.commit() + finally: + cursor.close() + conn.close() + + def append_data(self, data): + flat_data = [] + runid = uuid.uuid4() + for item in data: + item = item.copy() + item['runid'] = str(runid) + flat_data.append(self.flatten(item)) + conn = self.sqlite3.connect(self.archive_path) + cursor = conn.cursor() + cursor.executemany(SQL_INSERT_DATA, flat_data) + conn.commit() + cursor.close() + conn.close() + + def get_all_data(self): + conn = self.sqlite3.connect(self.archive_path) + conn.row_factory = self.sqlite3.Row + cursor = conn.cursor() + cursor.execute(SQL_SELECT_DATA) + data = cursor.fetchall() + cursor.close() + conn.close() + data = [self.unflatten(item) for item in data] + return data + + def flatten(self, item): + return (item.get('runid', None), + item.get('name', None), + item.get('passed', None), + item.get('skipped', None), + item.get('failed', None), + item.get('shortrepr', None), + item.get('longrepr', None), + item.get('fspath', None), + item.get('itemname', None), + ) + + def unflatten(self, item): + names = ("runid name passed skipped failed shortrepr " + "longrepr fspath itemname").split() + d = {} + for i, name in enumerate(names): + d[name] = item[i] + return d + + +class ResultDB(object): + def __init__(self, cls, db_path): + self.archive = cls(db_path) + self.archive.init_db() + + def write_log_entry(self, event, shortrepr, name, longrepr): + data = {} + event_excludes = ['colitem', 'longrepr'] + for item in vars(event).keys(): + if item not in event_excludes: + data[item] = getattr(event, item) + # use the locally calculated longrepr & shortrepr + data['longrepr'] = longrepr + data['shortrepr'] = shortrepr + + data['fspath'] = unicode(event.colitem.fspath) + data['itemname'] = event.colitem.name + + data['name'] = name + self.archive.append_data([data]) + + def log_outcome(self, event): + if (not event.passed or isinstance(event, event.ItemTestReport)): + gpath = generic_path(event.colitem) + shortrepr, longrepr = getoutcomecodes(event) + self.write_log_entry(event, shortrepr, gpath, longrepr) + + def pyevent_itemtestreport(self, event): + self.log_outcome(event) + + def pyevent_collectionreport(self, event): + if not event.passed: + self.log_outcome(event) + + def pyevent_internalerror(self, event): + path = event.repr.reprcrash.path # fishing :( + self.write_log_entry(event, '!', path, str(event.repr)) + + +SQL_CREATE_TABLES = """ +create table pytest_results ( + runid varchar(36), + name varchar, + passed int, + skipped int, + failed int, + shortrepr varchar, + longrepr varchar, + fspath varchar, + itemname varchar + ); +""" +SQL_INSERT_DATA = """ +insert into pytest_results ( + runid, + name, + passed, + skipped, + failed, + shortrepr, + longrepr, + fspath, + itemname) +values (?, ?, ?, ?, ?, ?, ?, ?, ?); +""" +SQL_SELECT_DATA = """ +select + runid, + name, + passed, + skipped, + failed, + shortrepr, + longrepr, + fspath, + itemname +from pytest_results; +""" + + +# =============================================================================== +# +# plugin tests +# +# =============================================================================== + +import os, StringIO + +class BaseResultArchiveTests(object): + cls = None + + def setup_class(cls): + # XXX refactor setup into a funcarg? + cls.tempdb = "test_tempdb" + + def test_init_db(self, testdir): + tempdb_path = unicode(testdir.tmpdir.join(self.tempdb)) + archive = self.cls(tempdb_path) + archive.init_db() + assert os.path.exists(tempdb_path) + + def test_db_insert(self, testdir): + tempdb_path = unicode(testdir.tmpdir.join(self.tempdb)) + archive = self.cls(tempdb_path) + archive.init_db() + assert len(archive.get_all_data()) == 0 + + data = [{'name': 'tmppackage/test_whatever.py:test_hello', + 'fspath': '/Users/brian/work/tmppackage/test_whatever.py', + 'name': 'test_hello', + 'longrepr': '', + 'passed': True, + 'shortrepr': '.' + }] + archive.append_data(data) + result = archive.get_all_data() + print result + assert len(result) == 1 + for key, value in data[0].items(): + assert value == result[0][key] + assert 'runid' in result[0] + + # make sure the data is persisted + tempdb_path = unicode(testdir.tmpdir.join(self.tempdb)) + archive = self.cls(tempdb_path) + archive.init_db() + assert len(archive.get_all_data()) == 1 + + +class TestJSONResultArchive(BaseResultArchiveTests): + cls = JSONResultArchive + + +class TestSQLiteResultArchive(BaseResultArchiveTests): + cls = SQLiteResultArchive + + def test_init_db_sql(self, testdir): + tempdb_path = unicode(testdir.tmpdir.join(self.tempdb)) + archive = self.cls(tempdb_path) + archive.init_db() + assert os.path.exists(tempdb_path) + + # is table in the database? + import sqlite3 + conn = sqlite3.connect(tempdb_path) + cursor = conn.cursor() + cursor.execute("""SELECT name FROM sqlite_master + ORDER BY name;""") + tables = cursor.fetchall() + cursor.close() + conn.close() + assert len(tables) == 1 + +def verify_archive_item_shape(item): + names = ("runid name passed skipped failed shortrepr " + "longrepr fspath itemname").split() + for name in names: + assert name in item + +class TestWithFunctionIntegration: + def getarchive(self, testdir, arg): + resultdb = testdir.tmpdir.join("resultdb") + args = ["--resultdb=%s" % resultdb, "--resultdb_format=sqlite"] + [arg] + testdir.runpytest(*args) + assert resultdb.check(file=1) + archive = SQLiteResultArchive(unicode(resultdb)) + archive.init_db() + return archive + + def test_collection_report(self, plugintester): + py.test.skip("Needs a rewrite for db version.") + testdir = plugintester.testdir() + ok = testdir.makepyfile(test_collection_ok="") + skip = testdir.makepyfile(test_collection_skip="import py ; py.test.skip('hello')") + fail = testdir.makepyfile(test_collection_fail="XXX") + + lines = self.getresultdb(testdir, ok) + assert not lines + + lines = self.getresultdb(testdir, skip) + assert len(lines) == 2 + assert lines[0].startswith("S ") + assert lines[0].endswith("test_collection_skip.py") + assert lines[1].startswith(" ") + assert lines[1].endswith("test_collection_skip.py:1: Skipped: 'hello'") + + lines = self.getresultdb(testdir, fail) + assert lines + assert lines[0].startswith("F ") + assert lines[0].endswith("test_collection_fail.py"), lines[0] + for x in lines[1:]: + assert x.startswith(" ") + assert "XXX" in "".join(lines[1:]) + + def test_log_test_outcomes(self, plugintester): + testdir = plugintester.testdir() + mod = testdir.makepyfile(test_mod=""" + import py + def test_pass(): pass + def test_skip(): py.test.skip("hello") + def test_fail(): raise ValueError("val") + """) + + archive = self.getarchive(testdir, mod) + data = archive.get_all_data() + for item in data: + verify_archive_item_shape(item) + assert len(data) == 3 + assert len([item for item in data if item['passed'] == True]) == 1 + assert len([item for item in data if item['skipped'] == True]) == 1 + assert len([item for item in data if item['failed'] == True]) == 1 + + def test_internal_exception(self): + py.test.skip("Needs a rewrite for db version.") + # they are produced for example by a teardown failing + # at the end of the run + from py.__.test import event + try: + raise ValueError + except ValueError: + excinfo = event.InternalException() + reslog = ResultDB(StringIO.StringIO()) + reslog.pyevent("internalerror", excinfo) + entry = reslog.logfile.getvalue() + entry_lines = entry.splitlines() + + assert entry_lines[0].startswith('! ') + assert os.path.basename(__file__)[:-1] in entry_lines[0] #.py/.pyc + assert entry_lines[-1][0] == ' ' + assert 'ValueError' in entry + +def test_generic(plugintester, LineMatcher): + plugintester.apicheck(ResultdbPlugin) + testdir = plugintester.testdir() + testdir.makepyfile(""" + import py + def test_pass(): + pass + def test_fail(): + assert 0 + def test_skip(): + py.test.skip("") + """) + testdir.runpytest("--resultdb=result.sqlite") + #testdir.tmpdir.join("result.sqlite") + diff --git a/py/test/plugin/pytest_resultlog.py b/py/test/plugin/pytest_resultlog.py index 2f8d3358d..ab4a8887c 100644 --- a/py/test/plugin/pytest_resultlog.py +++ b/py/test/plugin/pytest_resultlog.py @@ -45,6 +45,27 @@ def generic_path(item): fspath = newfspath return ''.join(gpath) +def getoutcomecodes(ev): + if isinstance(ev, ev.CollectionReport): + # encode pass/fail/skip indepedent of terminal reporting semantics + # XXX handle collection and item reports more uniformly + assert not ev.passed + if ev.failed: + code = "F" + elif ev.skipped: + code = "S" + longrepr = str(ev.longrepr.reprcrash) + else: + assert isinstance(ev, ev.ItemTestReport) + code = ev.shortrepr + if ev.passed: + longrepr = "" + elif ev.failed: + longrepr = str(ev.longrepr) + elif ev.skipped: + longrepr = str(ev.longrepr.reprcrash.message) + return code, longrepr + class ResultLog(object): def __init__(self, logfile): self.logfile = logfile # preferably line buffered @@ -54,31 +75,10 @@ class ResultLog(object): for line in longrepr.splitlines(): print >>self.logfile, " %s" % line - def getoutcomecodes(self, ev): - if isinstance(ev, ev.CollectionReport): - # encode pass/fail/skip indepedent of terminal reporting semantics - # XXX handle collection and item reports more uniformly - assert not ev.passed - if ev.failed: - code = "F" - elif ev.skipped: - code = "S" - longrepr = str(ev.longrepr.reprcrash) - else: - assert isinstance(ev, ev.ItemTestReport) - code = ev.shortrepr - if ev.passed: - longrepr = "" - elif ev.failed: - longrepr = str(ev.longrepr) - elif ev.skipped: - longrepr = str(ev.longrepr.reprcrash.message) - return code, longrepr - def log_outcome(self, event): if (not event.passed or isinstance(event, event.ItemTestReport)): gpath = generic_path(event.colitem) - shortrepr, longrepr = self.getoutcomecodes(event) + shortrepr, longrepr = getoutcomecodes(event) self.write_log_entry(shortrepr, gpath, longrepr) def pyevent(self, eventname, event, *args, **kwargs):