fix restoring Python state after in-process pytest runs
Now each in-process pytest run saves a snapshot of important global Python state and restores it after the test completes, including the list of loaded modules & the Python path settings. Previously only the loaded package data was getting restored, but that was also reverting any loaded package changes done in the test triggering the pytest runs, and not only those done by the pytest runs themselves. Updated acceptance tests broken by this change, which were only passing before by accident as they were making multiple pytest runs with later ones depending on sys.path changes left behind by the initial one.
This commit is contained in:
parent
67bd60d5c6
commit
f3c9c6e8a8
|
@ -477,9 +477,6 @@ class Testdir:
|
|||
return name.startswith("zope")
|
||||
return SysModulesSnapshot(preserve=preserve_module)
|
||||
|
||||
def delete_loaded_modules(self):
|
||||
self._sys_modules_snapshot.restore()
|
||||
|
||||
def make_hook_recorder(self, pluginmanager):
|
||||
"""Create a new :py:class:`HookRecorder` for a PluginManager."""
|
||||
assert not hasattr(pluginmanager, "reprec")
|
||||
|
@ -708,42 +705,58 @@ class Testdir:
|
|||
:return: a :py:class:`HookRecorder` instance
|
||||
|
||||
"""
|
||||
# When running py.test inline any plugins active in the main test
|
||||
# process are already imported. So this disables the warning which
|
||||
# will trigger to say they can no longer be rewritten, which is fine as
|
||||
# they have already been rewritten.
|
||||
orig_warn = AssertionRewritingHook._warn_already_imported
|
||||
finalizers = []
|
||||
try:
|
||||
# When running py.test inline any plugins active in the main test
|
||||
# process are already imported. So this disables the warning which
|
||||
# will trigger to say they can no longer be rewritten, which is
|
||||
# fine as they have already been rewritten.
|
||||
orig_warn = AssertionRewritingHook._warn_already_imported
|
||||
|
||||
def revert():
|
||||
AssertionRewritingHook._warn_already_imported = orig_warn
|
||||
def revert_warn_already_imported():
|
||||
AssertionRewritingHook._warn_already_imported = orig_warn
|
||||
finalizers.append(revert_warn_already_imported)
|
||||
AssertionRewritingHook._warn_already_imported = lambda *a: None
|
||||
|
||||
self.request.addfinalizer(revert)
|
||||
AssertionRewritingHook._warn_already_imported = lambda *a: None
|
||||
# Any sys.module or sys.path changes done while running py.test
|
||||
# inline should be reverted after the test run completes to avoid
|
||||
# clashing with later inline tests run within the same pytest test,
|
||||
# e.g. just because they use matching test module names.
|
||||
finalizers.append(self.__take_sys_modules_snapshot().restore)
|
||||
finalizers.append(SysPathsSnapshot().restore)
|
||||
|
||||
rec = []
|
||||
# Important note:
|
||||
# - our tests should not leave any other references/registrations
|
||||
# laying around other than possibly loaded test modules
|
||||
# referenced from sys.modules, as nothing will clean those up
|
||||
# automatically
|
||||
|
||||
class Collect:
|
||||
def pytest_configure(x, config):
|
||||
rec.append(self.make_hook_recorder(config.pluginmanager))
|
||||
rec = []
|
||||
|
||||
plugins = kwargs.get("plugins") or []
|
||||
plugins.append(Collect())
|
||||
ret = pytest.main(list(args), plugins=plugins)
|
||||
self.delete_loaded_modules()
|
||||
if len(rec) == 1:
|
||||
reprec = rec.pop()
|
||||
else:
|
||||
class reprec:
|
||||
pass
|
||||
reprec.ret = ret
|
||||
class Collect:
|
||||
def pytest_configure(x, config):
|
||||
rec.append(self.make_hook_recorder(config.pluginmanager))
|
||||
|
||||
# typically we reraise keyboard interrupts from the child run because
|
||||
# it's our user requesting interruption of the testing
|
||||
if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
|
||||
calls = reprec.getcalls("pytest_keyboard_interrupt")
|
||||
if calls and calls[-1].excinfo.type == KeyboardInterrupt:
|
||||
raise KeyboardInterrupt()
|
||||
return reprec
|
||||
plugins = kwargs.get("plugins") or []
|
||||
plugins.append(Collect())
|
||||
ret = pytest.main(list(args), plugins=plugins)
|
||||
if len(rec) == 1:
|
||||
reprec = rec.pop()
|
||||
else:
|
||||
class reprec:
|
||||
pass
|
||||
reprec.ret = ret
|
||||
|
||||
# typically we reraise keyboard interrupts from the child run
|
||||
# because it's our user requesting interruption of the testing
|
||||
if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
|
||||
calls = reprec.getcalls("pytest_keyboard_interrupt")
|
||||
if calls and calls[-1].excinfo.type == KeyboardInterrupt:
|
||||
raise KeyboardInterrupt()
|
||||
return reprec
|
||||
finally:
|
||||
for finalizer in finalizers:
|
||||
finalizer()
|
||||
|
||||
def runpytest_inprocess(self, *args, **kwargs):
|
||||
"""Return result of running pytest in-process, providing a similar
|
||||
|
|
|
@ -535,7 +535,7 @@ class TestInvocationVariants(object):
|
|||
path = testdir.mkpydir("tpkg")
|
||||
path.join("test_hello.py").write('raise ImportError')
|
||||
|
||||
result = testdir.runpytest_subprocess("--pyargs", "tpkg.test_hello")
|
||||
result = testdir.runpytest("--pyargs", "tpkg.test_hello", syspathinsert=True)
|
||||
assert result.ret != 0
|
||||
|
||||
result.stdout.fnmatch_lines([
|
||||
|
@ -553,7 +553,7 @@ class TestInvocationVariants(object):
|
|||
result.stdout.fnmatch_lines([
|
||||
"*2 passed*"
|
||||
])
|
||||
result = testdir.runpytest("--pyargs", "tpkg.test_hello")
|
||||
result = testdir.runpytest("--pyargs", "tpkg.test_hello", syspathinsert=True)
|
||||
assert result.ret == 0
|
||||
result.stdout.fnmatch_lines([
|
||||
"*1 passed*"
|
||||
|
@ -577,7 +577,7 @@ class TestInvocationVariants(object):
|
|||
])
|
||||
|
||||
monkeypatch.setenv('PYTHONPATH', join_pythonpath(testdir))
|
||||
result = testdir.runpytest("--pyargs", "tpkg.test_missing")
|
||||
result = testdir.runpytest("--pyargs", "tpkg.test_missing", syspathinsert=True)
|
||||
assert result.ret != 0
|
||||
result.stderr.fnmatch_lines([
|
||||
"*not*found*test_missing*",
|
||||
|
|
|
@ -58,7 +58,6 @@ def test_str_args_deprecated(tmpdir, testdir):
|
|||
warnings.append(message)
|
||||
|
||||
ret = pytest.main("%s -x" % tmpdir, plugins=[Collect()])
|
||||
testdir.delete_loaded_modules()
|
||||
msg = ('passing a string to pytest.main() is deprecated, '
|
||||
'pass a list of arguments instead.')
|
||||
assert msg in warnings
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, division, print_function
|
||||
import pytest
|
||||
import os
|
||||
import py.path
|
||||
import pytest
|
||||
import sys
|
||||
import _pytest.pytester as pytester
|
||||
from _pytest.pytester import HookRecorder
|
||||
from _pytest.pytester import CwdSnapshot, SysModulesSnapshot, SysPathsSnapshot
|
||||
from _pytest.config import PytestPluginManager
|
||||
from _pytest.main import EXIT_OK, EXIT_TESTSFAILED
|
||||
|
||||
|
@ -131,14 +135,116 @@ def test_makepyfile_utf8(testdir):
|
|||
assert u"mixed_encoding = u'São Paulo'".encode('utf-8') in p.read('rb')
|
||||
|
||||
|
||||
def test_inline_run_clean_modules(testdir):
|
||||
test_mod = testdir.makepyfile("def test_foo(): assert True")
|
||||
result = testdir.inline_run(str(test_mod))
|
||||
assert result.ret == EXIT_OK
|
||||
# rewrite module, now test should fail if module was re-imported
|
||||
test_mod.write("def test_foo(): assert False")
|
||||
result2 = testdir.inline_run(str(test_mod))
|
||||
assert result2.ret == EXIT_TESTSFAILED
|
||||
class TestInlineRunModulesCleanup:
|
||||
def test_inline_run_test_module_not_cleaned_up(self, testdir):
|
||||
test_mod = testdir.makepyfile("def test_foo(): assert True")
|
||||
result = testdir.inline_run(str(test_mod))
|
||||
assert result.ret == EXIT_OK
|
||||
# rewrite module, now test should fail if module was re-imported
|
||||
test_mod.write("def test_foo(): assert False")
|
||||
result2 = testdir.inline_run(str(test_mod))
|
||||
assert result2.ret == EXIT_TESTSFAILED
|
||||
|
||||
def spy_factory(self):
|
||||
class SysModulesSnapshotSpy:
|
||||
instances = []
|
||||
|
||||
def __init__(self, preserve=None):
|
||||
SysModulesSnapshotSpy.instances.append(self)
|
||||
self._spy_restore_count = 0
|
||||
self._spy_preserve = preserve
|
||||
self.__snapshot = SysModulesSnapshot(preserve=preserve)
|
||||
|
||||
def restore(self):
|
||||
self._spy_restore_count += 1
|
||||
return self.__snapshot.restore()
|
||||
return SysModulesSnapshotSpy
|
||||
|
||||
def test_inline_run_taking_and_restoring_a_sys_modules_snapshot(
|
||||
self, testdir, monkeypatch):
|
||||
spy_factory = self.spy_factory()
|
||||
monkeypatch.setattr(pytester, "SysModulesSnapshot", spy_factory)
|
||||
original = dict(sys.modules)
|
||||
testdir.syspathinsert()
|
||||
testdir.makepyfile(import1="# you son of a silly person")
|
||||
testdir.makepyfile(import2="# my hovercraft is full of eels")
|
||||
test_mod = testdir.makepyfile("""
|
||||
import import1
|
||||
def test_foo(): import import2""")
|
||||
testdir.inline_run(str(test_mod))
|
||||
assert len(spy_factory.instances) == 1
|
||||
spy = spy_factory.instances[0]
|
||||
assert spy._spy_restore_count == 1
|
||||
assert sys.modules == original
|
||||
assert all(sys.modules[x] is original[x] for x in sys.modules)
|
||||
|
||||
def test_inline_run_sys_modules_snapshot_restore_preserving_modules(
|
||||
self, testdir, monkeypatch):
|
||||
spy_factory = self.spy_factory()
|
||||
monkeypatch.setattr(pytester, "SysModulesSnapshot", spy_factory)
|
||||
test_mod = testdir.makepyfile("def test_foo(): pass")
|
||||
testdir.inline_run(str(test_mod))
|
||||
spy = spy_factory.instances[0]
|
||||
assert not spy._spy_preserve("black_knight")
|
||||
assert spy._spy_preserve("zope")
|
||||
assert spy._spy_preserve("zope.interface")
|
||||
assert spy._spy_preserve("zopelicious")
|
||||
|
||||
def test_external_test_module_imports_not_cleaned_up(self, testdir):
|
||||
testdir.syspathinsert()
|
||||
testdir.makepyfile(imported="data = 'you son of a silly person'")
|
||||
import imported
|
||||
test_mod = testdir.makepyfile("""
|
||||
def test_foo():
|
||||
import imported
|
||||
imported.data = 42""")
|
||||
testdir.inline_run(str(test_mod))
|
||||
assert imported.data == 42
|
||||
|
||||
|
||||
def test_inline_run_clean_sys_paths(testdir):
|
||||
def test_sys_path_change_cleanup(self, testdir):
|
||||
test_path1 = testdir.tmpdir.join("boink1").strpath
|
||||
test_path2 = testdir.tmpdir.join("boink2").strpath
|
||||
test_path3 = testdir.tmpdir.join("boink3").strpath
|
||||
sys.path.append(test_path1)
|
||||
sys.meta_path.append(test_path1)
|
||||
original_path = list(sys.path)
|
||||
original_meta_path = list(sys.meta_path)
|
||||
test_mod = testdir.makepyfile("""
|
||||
import sys
|
||||
sys.path.append({:test_path2})
|
||||
sys.meta_path.append({:test_path2})
|
||||
def test_foo():
|
||||
sys.path.append({:test_path3})
|
||||
sys.meta_path.append({:test_path3})""".format(locals()))
|
||||
testdir.inline_run(str(test_mod))
|
||||
assert sys.path == original_path
|
||||
assert sys.meta_path == original_meta_path
|
||||
|
||||
def spy_factory(self):
|
||||
class SysPathsSnapshotSpy:
|
||||
instances = []
|
||||
|
||||
def __init__(self):
|
||||
SysPathsSnapshotSpy.instances.append(self)
|
||||
self._spy_restore_count = 0
|
||||
self.__snapshot = SysPathsSnapshot()
|
||||
|
||||
def restore(self):
|
||||
self._spy_restore_count += 1
|
||||
return self.__snapshot.restore()
|
||||
return SysPathsSnapshotSpy
|
||||
|
||||
def test_inline_run_taking_and_restoring_a_sys_paths_snapshot(
|
||||
self, testdir, monkeypatch):
|
||||
spy_factory = self.spy_factory()
|
||||
monkeypatch.setattr(pytester, "SysPathsSnapshot", spy_factory)
|
||||
test_mod = testdir.makepyfile("def test_foo(): pass")
|
||||
testdir.inline_run(str(test_mod))
|
||||
assert len(spy_factory.instances) == 1
|
||||
spy = spy_factory.instances[0]
|
||||
assert spy._spy_restore_count == 1
|
||||
|
||||
|
||||
def test_assert_outcomes_after_pytest_error(testdir):
|
||||
|
@ -147,3 +253,126 @@ def test_assert_outcomes_after_pytest_error(testdir):
|
|||
result = testdir.runpytest('--unexpected-argument')
|
||||
with pytest.raises(ValueError, message="Pytest terminal report not found"):
|
||||
result.assert_outcomes(passed=0)
|
||||
|
||||
|
||||
def test_cwd_snapshot(tmpdir):
|
||||
foo = tmpdir.ensure('foo', dir=1)
|
||||
bar = tmpdir.ensure('bar', dir=1)
|
||||
foo.chdir()
|
||||
snapshot = CwdSnapshot()
|
||||
bar.chdir()
|
||||
assert py.path.local() == bar
|
||||
snapshot.restore()
|
||||
assert py.path.local() == foo
|
||||
|
||||
|
||||
class TestSysModulesSnapshot:
|
||||
key = 'my-test-module'
|
||||
|
||||
def test_remove_added(self):
|
||||
original = dict(sys.modules)
|
||||
assert self.key not in sys.modules
|
||||
snapshot = SysModulesSnapshot()
|
||||
sys.modules[self.key] = 'something'
|
||||
assert self.key in sys.modules
|
||||
snapshot.restore()
|
||||
assert sys.modules == original
|
||||
|
||||
def test_add_removed(self, monkeypatch):
|
||||
assert self.key not in sys.modules
|
||||
monkeypatch.setitem(sys.modules, self.key, 'something')
|
||||
assert self.key in sys.modules
|
||||
original = dict(sys.modules)
|
||||
snapshot = SysModulesSnapshot()
|
||||
del sys.modules[self.key]
|
||||
assert self.key not in sys.modules
|
||||
snapshot.restore()
|
||||
assert sys.modules == original
|
||||
|
||||
def test_restore_reloaded(self, monkeypatch):
|
||||
assert self.key not in sys.modules
|
||||
monkeypatch.setitem(sys.modules, self.key, 'something')
|
||||
assert self.key in sys.modules
|
||||
original = dict(sys.modules)
|
||||
snapshot = SysModulesSnapshot()
|
||||
sys.modules[self.key] = 'something else'
|
||||
snapshot.restore()
|
||||
assert sys.modules == original
|
||||
|
||||
def test_preserve_modules(self, monkeypatch):
|
||||
key = [self.key + str(i) for i in range(3)]
|
||||
assert not any(k in sys.modules for k in key)
|
||||
for i, k in enumerate(key):
|
||||
monkeypatch.setitem(sys.modules, k, 'something' + str(i))
|
||||
original = dict(sys.modules)
|
||||
|
||||
def preserve(name):
|
||||
return name in (key[0], key[1], 'some-other-key')
|
||||
|
||||
snapshot = SysModulesSnapshot(preserve=preserve)
|
||||
sys.modules[key[0]] = original[key[0]] = 'something else0'
|
||||
sys.modules[key[1]] = original[key[1]] = 'something else1'
|
||||
sys.modules[key[2]] = 'something else2'
|
||||
snapshot.restore()
|
||||
assert sys.modules == original
|
||||
|
||||
def test_preserve_container(self, monkeypatch):
|
||||
original = dict(sys.modules)
|
||||
assert self.key not in original
|
||||
replacement = dict(sys.modules)
|
||||
replacement[self.key] = 'life of brian'
|
||||
snapshot = SysModulesSnapshot()
|
||||
monkeypatch.setattr(sys, 'modules', replacement)
|
||||
snapshot.restore()
|
||||
assert sys.modules is replacement
|
||||
assert sys.modules == original
|
||||
|
||||
|
||||
@pytest.mark.parametrize('path_type', ('path', 'meta_path'))
|
||||
class TestSysPathsSnapshot:
|
||||
other_path = {
|
||||
'path': 'meta_path',
|
||||
'meta_path': 'path'}
|
||||
|
||||
@staticmethod
|
||||
def path(n):
|
||||
return 'my-dirty-little-secret-' + str(n)
|
||||
|
||||
def test_restore(self, monkeypatch, path_type):
|
||||
other_path_type = self.other_path[path_type]
|
||||
for i in range(10):
|
||||
assert self.path(i) not in getattr(sys, path_type)
|
||||
sys_path = [self.path(i) for i in range(6)]
|
||||
monkeypatch.setattr(sys, path_type, sys_path)
|
||||
original = list(sys_path)
|
||||
original_other = list(getattr(sys, other_path_type))
|
||||
snapshot = SysPathsSnapshot()
|
||||
transformation = {
|
||||
'source': (0, 1, 2, 3, 4, 5),
|
||||
'target': ( 6, 2, 9, 7, 5, 8)} # noqa: E201
|
||||
assert sys_path == [self.path(x) for x in transformation['source']]
|
||||
sys_path[1] = self.path(6)
|
||||
sys_path[3] = self.path(7)
|
||||
sys_path.append(self.path(8))
|
||||
del sys_path[4]
|
||||
sys_path[3:3] = [self.path(9)]
|
||||
del sys_path[0]
|
||||
assert sys_path == [self.path(x) for x in transformation['target']]
|
||||
snapshot.restore()
|
||||
assert getattr(sys, path_type) is sys_path
|
||||
assert getattr(sys, path_type) == original
|
||||
assert getattr(sys, other_path_type) == original_other
|
||||
|
||||
def test_preserve_container(self, monkeypatch, path_type):
|
||||
other_path_type = self.other_path[path_type]
|
||||
original_data = list(getattr(sys, path_type))
|
||||
original_other = getattr(sys, other_path_type)
|
||||
original_other_data = list(original_other)
|
||||
new = []
|
||||
snapshot = SysPathsSnapshot()
|
||||
monkeypatch.setattr(sys, path_type, new)
|
||||
snapshot.restore()
|
||||
assert getattr(sys, path_type) is new
|
||||
assert getattr(sys, path_type) == original_data
|
||||
assert getattr(sys, other_path_type) is original_other
|
||||
assert getattr(sys, other_path_type) == original_other_data
|
||||
|
|
Loading…
Reference in New Issue