Merge pull request #3016 from jurko-gospodnetic/clean-up-state-after-in-process-pytest-runs

Clean up state after in process pytest runs
This commit is contained in:
Ronny Pfannschmidt 2017-12-18 17:56:24 +01:00 committed by GitHub
commit 0d83dd1b31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 333 additions and 65 deletions

View File

@ -390,6 +390,35 @@ class RunResult:
assert obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error)
class CwdSnapshot:
def __init__(self):
self.__saved = os.getcwd()
def restore(self):
os.chdir(self.__saved)
class SysModulesSnapshot:
def __init__(self, preserve=None):
self.__preserve = preserve
self.__saved = dict(sys.modules)
def restore(self):
if self.__preserve:
self.__saved.update(
(k, m) for k, m in sys.modules.items() if self.__preserve(k))
sys.modules.clear()
sys.modules.update(self.__saved)
class SysPathsSnapshot:
def __init__(self):
self.__saved = list(sys.path), list(sys.meta_path)
def restore(self):
sys.path[:], sys.meta_path[:] = self.__saved
class Testdir:
"""Temporary test directory with tools to test/run pytest itself.
@ -414,9 +443,10 @@ class Testdir:
name = request.function.__name__
self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
self.plugins = []
self._savesyspath = (list(sys.path), list(sys.meta_path))
self._savemodulekeys = set(sys.modules)
self.chdir() # always chdir
self._cwd_snapshot = CwdSnapshot()
self._sys_path_snapshot = SysPathsSnapshot()
self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
self.chdir()
self.request.addfinalizer(self.finalize)
method = self.request.config.getoption("--runpytest")
if method == "inprocess":
@ -435,23 +465,17 @@ class Testdir:
it can be looked at after the test run has finished.
"""
sys.path[:], sys.meta_path[:] = self._savesyspath
if hasattr(self, '_olddir'):
self._olddir.chdir()
self.delete_loaded_modules()
self._sys_modules_snapshot.restore()
self._sys_path_snapshot.restore()
self._cwd_snapshot.restore()
def delete_loaded_modules(self):
"""Delete modules that have been loaded during a test.
This allows the interpreter to catch module changes in case
the module is re-imported.
"""
for name in set(sys.modules).difference(self._savemodulekeys):
# some zope modules used by twisted-related tests keeps internal
# state and can't be deleted; we had some trouble in the past
# with zope.interface for example
if not name.startswith("zope"):
del sys.modules[name]
def __take_sys_modules_snapshot(self):
# some zope modules used by twisted-related tests keep internal state
# and can't be deleted; we had some trouble in the past with
# `zope.interface` for example
def preserve_module(name):
return name.startswith("zope")
return SysModulesSnapshot(preserve=preserve_module)
def make_hook_recorder(self, pluginmanager):
"""Create a new :py:class:`HookRecorder` for a PluginManager."""
@ -466,9 +490,7 @@ class Testdir:
This is done automatically upon instantiation.
"""
old = self.tmpdir.chdir()
if not hasattr(self, '_olddir'):
self._olddir = old
self.tmpdir.chdir()
def _makefile(self, ext, args, kwargs, encoding='utf-8'):
items = list(kwargs.items())
@ -683,18 +705,32 @@ class Testdir:
:return: a :py:class:`HookRecorder` instance
"""
finalizers = []
try:
# When running py.test inline any plugins active in the main test
# process are already imported. So this disables the warning which
# will trigger to say they can no longer be rewritten, which is fine as
# they have already been rewritten.
# will trigger to say they can no longer be rewritten, which is
# fine as they have already been rewritten.
orig_warn = AssertionRewritingHook._warn_already_imported
def revert():
def revert_warn_already_imported():
AssertionRewritingHook._warn_already_imported = orig_warn
self.request.addfinalizer(revert)
finalizers.append(revert_warn_already_imported)
AssertionRewritingHook._warn_already_imported = lambda *a: None
# Any sys.module or sys.path changes done while running py.test
# inline should be reverted after the test run completes to avoid
# clashing with later inline tests run within the same pytest test,
# e.g. just because they use matching test module names.
finalizers.append(self.__take_sys_modules_snapshot().restore)
finalizers.append(SysPathsSnapshot().restore)
# Important note:
# - our tests should not leave any other references/registrations
# laying around other than possibly loaded test modules
# referenced from sys.modules, as nothing will clean those up
# automatically
rec = []
class Collect:
@ -704,7 +740,6 @@ class Testdir:
plugins = kwargs.get("plugins") or []
plugins.append(Collect())
ret = pytest.main(list(args), plugins=plugins)
self.delete_loaded_modules()
if len(rec) == 1:
reprec = rec.pop()
else:
@ -712,13 +747,16 @@ class Testdir:
pass
reprec.ret = ret
# typically we reraise keyboard interrupts from the child run because
# it's our user requesting interruption of the testing
# typically we reraise keyboard interrupts from the child run
# because it's our user requesting interruption of the testing
if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
calls = reprec.getcalls("pytest_keyboard_interrupt")
if calls and calls[-1].excinfo.type == KeyboardInterrupt:
raise KeyboardInterrupt()
return reprec
finally:
for finalizer in finalizers:
finalizer()
def runpytest_inprocess(self, *args, **kwargs):
"""Return result of running pytest in-process, providing a similar

2
changelog/3016.bugfix Normal file
View File

@ -0,0 +1,2 @@
Fixed restoring Python state after in-process pytest runs with the ``pytester`` plugin; this may break tests using
making multiple inprocess pytest runs if later ones depend on earlier ones leaking global interpreter changes.

View File

@ -535,7 +535,7 @@ class TestInvocationVariants(object):
path = testdir.mkpydir("tpkg")
path.join("test_hello.py").write('raise ImportError')
result = testdir.runpytest_subprocess("--pyargs", "tpkg.test_hello")
result = testdir.runpytest("--pyargs", "tpkg.test_hello", syspathinsert=True)
assert result.ret != 0
result.stdout.fnmatch_lines([
@ -553,7 +553,7 @@ class TestInvocationVariants(object):
result.stdout.fnmatch_lines([
"*2 passed*"
])
result = testdir.runpytest("--pyargs", "tpkg.test_hello")
result = testdir.runpytest("--pyargs", "tpkg.test_hello", syspathinsert=True)
assert result.ret == 0
result.stdout.fnmatch_lines([
"*1 passed*"
@ -577,7 +577,7 @@ class TestInvocationVariants(object):
])
monkeypatch.setenv('PYTHONPATH', join_pythonpath(testdir))
result = testdir.runpytest("--pyargs", "tpkg.test_missing")
result = testdir.runpytest("--pyargs", "tpkg.test_missing", syspathinsert=True)
assert result.ret != 0
result.stderr.fnmatch_lines([
"*not*found*test_missing*",

View File

@ -58,7 +58,6 @@ def test_str_args_deprecated(tmpdir, testdir):
warnings.append(message)
ret = pytest.main("%s -x" % tmpdir, plugins=[Collect()])
testdir.delete_loaded_modules()
msg = ('passing a string to pytest.main() is deprecated, '
'pass a list of arguments instead.')
assert msg in warnings

View File

@ -1,8 +1,12 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import pytest
import os
import py.path
import pytest
import sys
import _pytest.pytester as pytester
from _pytest.pytester import HookRecorder
from _pytest.pytester import CwdSnapshot, SysModulesSnapshot, SysPathsSnapshot
from _pytest.config import PytestPluginManager
from _pytest.main import EXIT_OK, EXIT_TESTSFAILED
@ -131,7 +135,8 @@ def test_makepyfile_utf8(testdir):
assert u"mixed_encoding = u'São Paulo'".encode('utf-8') in p.read('rb')
def test_inline_run_clean_modules(testdir):
class TestInlineRunModulesCleanup:
def test_inline_run_test_module_not_cleaned_up(self, testdir):
test_mod = testdir.makepyfile("def test_foo(): assert True")
result = testdir.inline_run(str(test_mod))
assert result.ret == EXIT_OK
@ -140,6 +145,107 @@ def test_inline_run_clean_modules(testdir):
result2 = testdir.inline_run(str(test_mod))
assert result2.ret == EXIT_TESTSFAILED
def spy_factory(self):
class SysModulesSnapshotSpy:
instances = []
def __init__(self, preserve=None):
SysModulesSnapshotSpy.instances.append(self)
self._spy_restore_count = 0
self._spy_preserve = preserve
self.__snapshot = SysModulesSnapshot(preserve=preserve)
def restore(self):
self._spy_restore_count += 1
return self.__snapshot.restore()
return SysModulesSnapshotSpy
def test_inline_run_taking_and_restoring_a_sys_modules_snapshot(
self, testdir, monkeypatch):
spy_factory = self.spy_factory()
monkeypatch.setattr(pytester, "SysModulesSnapshot", spy_factory)
original = dict(sys.modules)
testdir.syspathinsert()
testdir.makepyfile(import1="# you son of a silly person")
testdir.makepyfile(import2="# my hovercraft is full of eels")
test_mod = testdir.makepyfile("""
import import1
def test_foo(): import import2""")
testdir.inline_run(str(test_mod))
assert len(spy_factory.instances) == 1
spy = spy_factory.instances[0]
assert spy._spy_restore_count == 1
assert sys.modules == original
assert all(sys.modules[x] is original[x] for x in sys.modules)
def test_inline_run_sys_modules_snapshot_restore_preserving_modules(
self, testdir, monkeypatch):
spy_factory = self.spy_factory()
monkeypatch.setattr(pytester, "SysModulesSnapshot", spy_factory)
test_mod = testdir.makepyfile("def test_foo(): pass")
testdir.inline_run(str(test_mod))
spy = spy_factory.instances[0]
assert not spy._spy_preserve("black_knight")
assert spy._spy_preserve("zope")
assert spy._spy_preserve("zope.interface")
assert spy._spy_preserve("zopelicious")
def test_external_test_module_imports_not_cleaned_up(self, testdir):
testdir.syspathinsert()
testdir.makepyfile(imported="data = 'you son of a silly person'")
import imported
test_mod = testdir.makepyfile("""
def test_foo():
import imported
imported.data = 42""")
testdir.inline_run(str(test_mod))
assert imported.data == 42
def test_inline_run_clean_sys_paths(testdir):
def test_sys_path_change_cleanup(self, testdir):
test_path1 = testdir.tmpdir.join("boink1").strpath
test_path2 = testdir.tmpdir.join("boink2").strpath
test_path3 = testdir.tmpdir.join("boink3").strpath
sys.path.append(test_path1)
sys.meta_path.append(test_path1)
original_path = list(sys.path)
original_meta_path = list(sys.meta_path)
test_mod = testdir.makepyfile("""
import sys
sys.path.append({:test_path2})
sys.meta_path.append({:test_path2})
def test_foo():
sys.path.append({:test_path3})
sys.meta_path.append({:test_path3})""".format(locals()))
testdir.inline_run(str(test_mod))
assert sys.path == original_path
assert sys.meta_path == original_meta_path
def spy_factory(self):
class SysPathsSnapshotSpy:
instances = []
def __init__(self):
SysPathsSnapshotSpy.instances.append(self)
self._spy_restore_count = 0
self.__snapshot = SysPathsSnapshot()
def restore(self):
self._spy_restore_count += 1
return self.__snapshot.restore()
return SysPathsSnapshotSpy
def test_inline_run_taking_and_restoring_a_sys_paths_snapshot(
self, testdir, monkeypatch):
spy_factory = self.spy_factory()
monkeypatch.setattr(pytester, "SysPathsSnapshot", spy_factory)
test_mod = testdir.makepyfile("def test_foo(): pass")
testdir.inline_run(str(test_mod))
assert len(spy_factory.instances) == 1
spy = spy_factory.instances[0]
assert spy._spy_restore_count == 1
def test_assert_outcomes_after_pytest_error(testdir):
testdir.makepyfile("def test_foo(): assert True")
@ -147,3 +253,126 @@ def test_assert_outcomes_after_pytest_error(testdir):
result = testdir.runpytest('--unexpected-argument')
with pytest.raises(ValueError, message="Pytest terminal report not found"):
result.assert_outcomes(passed=0)
def test_cwd_snapshot(tmpdir):
foo = tmpdir.ensure('foo', dir=1)
bar = tmpdir.ensure('bar', dir=1)
foo.chdir()
snapshot = CwdSnapshot()
bar.chdir()
assert py.path.local() == bar
snapshot.restore()
assert py.path.local() == foo
class TestSysModulesSnapshot:
key = 'my-test-module'
def test_remove_added(self):
original = dict(sys.modules)
assert self.key not in sys.modules
snapshot = SysModulesSnapshot()
sys.modules[self.key] = 'something'
assert self.key in sys.modules
snapshot.restore()
assert sys.modules == original
def test_add_removed(self, monkeypatch):
assert self.key not in sys.modules
monkeypatch.setitem(sys.modules, self.key, 'something')
assert self.key in sys.modules
original = dict(sys.modules)
snapshot = SysModulesSnapshot()
del sys.modules[self.key]
assert self.key not in sys.modules
snapshot.restore()
assert sys.modules == original
def test_restore_reloaded(self, monkeypatch):
assert self.key not in sys.modules
monkeypatch.setitem(sys.modules, self.key, 'something')
assert self.key in sys.modules
original = dict(sys.modules)
snapshot = SysModulesSnapshot()
sys.modules[self.key] = 'something else'
snapshot.restore()
assert sys.modules == original
def test_preserve_modules(self, monkeypatch):
key = [self.key + str(i) for i in range(3)]
assert not any(k in sys.modules for k in key)
for i, k in enumerate(key):
monkeypatch.setitem(sys.modules, k, 'something' + str(i))
original = dict(sys.modules)
def preserve(name):
return name in (key[0], key[1], 'some-other-key')
snapshot = SysModulesSnapshot(preserve=preserve)
sys.modules[key[0]] = original[key[0]] = 'something else0'
sys.modules[key[1]] = original[key[1]] = 'something else1'
sys.modules[key[2]] = 'something else2'
snapshot.restore()
assert sys.modules == original
def test_preserve_container(self, monkeypatch):
original = dict(sys.modules)
assert self.key not in original
replacement = dict(sys.modules)
replacement[self.key] = 'life of brian'
snapshot = SysModulesSnapshot()
monkeypatch.setattr(sys, 'modules', replacement)
snapshot.restore()
assert sys.modules is replacement
assert sys.modules == original
@pytest.mark.parametrize('path_type', ('path', 'meta_path'))
class TestSysPathsSnapshot:
other_path = {
'path': 'meta_path',
'meta_path': 'path'}
@staticmethod
def path(n):
return 'my-dirty-little-secret-' + str(n)
def test_restore(self, monkeypatch, path_type):
other_path_type = self.other_path[path_type]
for i in range(10):
assert self.path(i) not in getattr(sys, path_type)
sys_path = [self.path(i) for i in range(6)]
monkeypatch.setattr(sys, path_type, sys_path)
original = list(sys_path)
original_other = list(getattr(sys, other_path_type))
snapshot = SysPathsSnapshot()
transformation = {
'source': (0, 1, 2, 3, 4, 5),
'target': ( 6, 2, 9, 7, 5, 8)} # noqa: E201
assert sys_path == [self.path(x) for x in transformation['source']]
sys_path[1] = self.path(6)
sys_path[3] = self.path(7)
sys_path.append(self.path(8))
del sys_path[4]
sys_path[3:3] = [self.path(9)]
del sys_path[0]
assert sys_path == [self.path(x) for x in transformation['target']]
snapshot.restore()
assert getattr(sys, path_type) is sys_path
assert getattr(sys, path_type) == original
assert getattr(sys, other_path_type) == original_other
def test_preserve_container(self, monkeypatch, path_type):
other_path_type = self.other_path[path_type]
original_data = list(getattr(sys, path_type))
original_other = getattr(sys, other_path_type)
original_other_data = list(original_other)
new = []
snapshot = SysPathsSnapshot()
monkeypatch.setattr(sys, path_type, new)
snapshot.restore()
assert getattr(sys, path_type) is new
assert getattr(sys, path_type) == original_data
assert getattr(sys, other_path_type) is original_other
assert getattr(sys, other_path_type) == original_other_data