test_ok1/_pytest/cacheprovider.py

244 lines
8.5 KiB
Python
Executable File

"""
merged implementation of the cache provider
the name cache was not choosen to ensure pluggy automatically
ignores the external pytest-cache
"""
import py
import pytest
import json
from os.path import sep as _sep, altsep as _altsep
class Cache(object):
def __init__(self, config):
self.config = config
self._cachedir = config.rootdir.join(".cache")
self.trace = config.trace.root.get("cache")
if config.getvalue("cacheclear"):
self.trace("clearing cachedir")
if self._cachedir.check():
self._cachedir.remove()
self._cachedir.mkdir()
def makedir(self, name):
""" return a directory path object with the given name. If the
directory does not yet exist, it will be created. You can use it
to manage files likes e. g. store/retrieve database
dumps across test sessions.
:param name: must be a string not containing a ``/`` separator.
Make sure the name contains your plugin or application
identifiers to prevent clashes with other cache users.
"""
if _sep in name or _altsep is not None and _altsep in name:
raise ValueError("name is not allowed to contain path separators")
return self._cachedir.ensure_dir("d", name)
def _getvaluepath(self, key):
return self._cachedir.join('v', *key.split('/'))
def get(self, key, default):
""" return cached value for the given key. If no value
was yet cached or the value cannot be read, the specified
default is returned.
:param key: must be a ``/`` separated value. Usually the first
name is the name of your plugin or your application.
:param default: must be provided in case of a cache-miss or
invalid cache values.
"""
path = self._getvaluepath(key)
if path.check():
try:
with path.open("r") as f:
return json.load(f)
except ValueError:
self.trace("cache-invalid at %s" % (path,))
return default
def set(self, key, value):
""" save value for the given key.
:param key: must be a ``/`` separated value. Usually the first
name is the name of your plugin or your application.
:param value: must be of any combination of basic
python types, including nested types
like e. g. lists of dictionaries.
"""
path = self._getvaluepath(key)
try:
path.dirpath().ensure_dir()
except (py.error.EEXIST, py.error.EACCES):
self.config.warn(
code='I9', message='could not create cache path %s' % (path,)
)
return
try:
f = path.open('w')
except py.error.ENOTDIR:
self.config.warn(
code='I9', message='cache could not write path %s' % (path,))
else:
with f:
self.trace("cache-write %s: %r" % (key, value,))
json.dump(value, f, indent=2, sort_keys=True)
class LFPlugin:
""" Plugin which implements the --lf (run last-failing) option """
def __init__(self, config):
self.config = config
active_keys = 'lf', 'failedfirst'
self.active = any(config.getvalue(key) for key in active_keys)
if self.active:
self.lastfailed = config.cache.get("cache/lastfailed", {})
else:
self.lastfailed = {}
def pytest_report_header(self):
if self.active:
if not self.lastfailed:
mode = "run all (no recorded failures)"
else:
mode = "rerun last %d failures%s" % (
len(self.lastfailed),
" first" if self.config.getvalue("failedfirst") else "")
return "run-last-failure: %s" % mode
def pytest_runtest_logreport(self, report):
if report.failed and "xfail" not in report.keywords:
self.lastfailed[report.nodeid] = True
elif not report.failed:
if report.when == "call":
self.lastfailed.pop(report.nodeid, None)
def pytest_collectreport(self, report):
passed = report.outcome in ('passed', 'skipped')
if passed:
if report.nodeid in self.lastfailed:
self.lastfailed.pop(report.nodeid)
self.lastfailed.update(
(item.nodeid, True)
for item in report.result)
else:
self.lastfailed[report.nodeid] = True
def pytest_collection_modifyitems(self, session, config, items):
if self.active and self.lastfailed:
previously_failed = []
previously_passed = []
for item in items:
if item.nodeid in self.lastfailed:
previously_failed.append(item)
else:
previously_passed.append(item)
if not previously_failed and previously_passed:
# running a subset of all tests with recorded failures outside
# of the set of tests currently executing
pass
elif self.config.getvalue("failedfirst"):
items[:] = previously_failed + previously_passed
else:
items[:] = previously_failed
config.hook.pytest_deselected(items=previously_passed)
def pytest_sessionfinish(self, session):
config = self.config
if config.getvalue("cacheshow") or hasattr(config, "slaveinput"):
return
config.cache.set("cache/lastfailed", self.lastfailed)
def pytest_addoption(parser):
group = parser.getgroup("general")
group.addoption(
'--lf', action='store_true', dest="lf",
help="rerun only the tests that failed "
"at the last run (or all if none failed)")
group.addoption(
'--ff', action='store_true', dest="failedfirst",
help="run all tests but run the last failures first. "
"This may re-order tests and thus lead to "
"repeated fixture setup/teardown")
group.addoption(
'--cache-show', action='store_true', dest="cacheshow",
help="show cache contents, don't perform collection or tests")
group.addoption(
'--cache-clear', action='store_true', dest="cacheclear",
help="remove all cache contents at start of test run.")
def pytest_cmdline_main(config):
if config.option.cacheshow:
from _pytest.main import wrap_session
return wrap_session(config, cacheshow)
@pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
config.cache = Cache(config)
config.pluginmanager.register(LFPlugin(config), "lfplugin")
@pytest.fixture
def cache(request):
"""
Return a cache object that can persist state between testing sessions.
cache.get(key, default)
cache.set(key, value)
Keys must be strings not containing a "/" separator. Add a unique identifier
(such as plugin/app name) to avoid clashes with other cache users.
Values can be any object handled by the json stdlib module.
"""
return request.config.cache
def pytest_report_header(config):
if config.option.verbose:
relpath = py.path.local().bestrelpath(config.cache._cachedir)
return "cachedir: %s" % relpath
def cacheshow(config, session):
from pprint import pprint
tw = py.io.TerminalWriter()
tw.line("cachedir: " + str(config.cache._cachedir))
if not config.cache._cachedir.check():
tw.line("cache is empty")
return 0
dummy = object()
basedir = config.cache._cachedir
vdir = basedir.join("v")
tw.sep("-", "cache values")
for valpath in vdir.visit(lambda x: x.isfile()):
key = valpath.relto(vdir).replace(valpath.sep, "/")
val = config.cache.get(key, dummy)
if val is dummy:
tw.line("%s contains unreadable content, "
"will be ignored" % key)
else:
tw.line("%s contains:" % key)
stream = py.io.TextIO()
pprint(val, stream=stream)
for line in stream.getvalue().splitlines():
tw.line(" " + line)
ddir = basedir.join("d")
if ddir.isdir() and ddir.listdir():
tw.sep("-", "cache directories")
for p in basedir.join("d").visit():
#if p.check(dir=1):
# print("%s/" % p.relto(basedir))
if p.isfile():
key = p.relto(basedir)
tw.line("%s is a file of length %d" % (
key, p.size()))
return 0