292 lines
11 KiB
Python
292 lines
11 KiB
Python
import os
|
|
from optparse import make_option
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.test import TestCase
|
|
from django.test.utils import setup_test_environment, teardown_test_environment
|
|
from django.utils import unittest
|
|
from django.utils.unittest import TestSuite, defaultTestLoader
|
|
|
|
|
|
class DiscoverRunner(object):
|
|
"""
|
|
A Django test runner that uses unittest2 test discovery.
|
|
"""
|
|
|
|
test_loader = defaultTestLoader
|
|
reorder_by = (TestCase, )
|
|
option_list = (
|
|
make_option('-t', '--top-level-directory',
|
|
action='store', dest='top_level', default=None,
|
|
help='Top level of project for unittest discovery.'),
|
|
make_option('-p', '--pattern', action='store', dest='pattern',
|
|
default="test*.py",
|
|
help='The test matching pattern. Defaults to test*.py.'),
|
|
)
|
|
|
|
def __init__(self, pattern=None, top_level=None,
|
|
verbosity=1, interactive=True, failfast=False,
|
|
**kwargs):
|
|
|
|
self.pattern = pattern
|
|
self.top_level = top_level
|
|
|
|
self.verbosity = verbosity
|
|
self.interactive = interactive
|
|
self.failfast = failfast
|
|
|
|
def setup_test_environment(self, **kwargs):
|
|
setup_test_environment()
|
|
settings.DEBUG = False
|
|
unittest.installHandler()
|
|
|
|
def build_suite(self, test_labels=None, extra_tests=None, **kwargs):
|
|
suite = TestSuite()
|
|
test_labels = test_labels or ['.']
|
|
extra_tests = extra_tests or []
|
|
|
|
discover_kwargs = {}
|
|
if self.pattern is not None:
|
|
discover_kwargs['pattern'] = self.pattern
|
|
if self.top_level is not None:
|
|
discover_kwargs['top_level_dir'] = self.top_level
|
|
|
|
for label in test_labels:
|
|
kwargs = discover_kwargs.copy()
|
|
tests = None
|
|
|
|
label_as_path = os.path.abspath(label)
|
|
|
|
# if a module, or "module.ClassName[.method_name]", just run those
|
|
if not os.path.exists(label_as_path):
|
|
tests = self.test_loader.loadTestsFromName(label)
|
|
elif os.path.isdir(label_as_path) and not self.top_level:
|
|
# Try to be a bit smarter than unittest about finding the
|
|
# default top-level for a given directory path, to avoid
|
|
# breaking relative imports. (Unittest's default is to set
|
|
# top-level equal to the path, which means relative imports
|
|
# will result in "Attempted relative import in non-package.").
|
|
|
|
# We'd be happy to skip this and require dotted module paths
|
|
# (which don't cause this problem) instead of file paths (which
|
|
# do), but in the case of a directory in the cwd, which would
|
|
# be equally valid if considered as a top-level module or as a
|
|
# directory path, unittest unfortunately prefers the latter.
|
|
|
|
top_level = label_as_path
|
|
while True:
|
|
init_py = os.path.join(top_level, '__init__.py')
|
|
if os.path.exists(init_py):
|
|
try_next = os.path.dirname(top_level)
|
|
if try_next == top_level:
|
|
# __init__.py all the way down? give up.
|
|
break
|
|
top_level = try_next
|
|
continue
|
|
break
|
|
kwargs['top_level_dir'] = top_level
|
|
|
|
|
|
if not (tests and tests.countTestCases()):
|
|
# if no tests found, it's probably a package; try discovery
|
|
tests = self.test_loader.discover(start_dir=label, **kwargs)
|
|
|
|
# make unittest forget the top-level dir it calculated from this
|
|
# run, to support running tests from two different top-levels.
|
|
self.test_loader._top_level_dir = None
|
|
|
|
suite.addTests(tests)
|
|
|
|
for test in extra_tests:
|
|
suite.addTest(test)
|
|
|
|
return reorder_suite(suite, self.reorder_by)
|
|
|
|
def setup_databases(self, **kwargs):
|
|
return setup_databases(self.verbosity, self.interactive, **kwargs)
|
|
|
|
def run_suite(self, suite, **kwargs):
|
|
return unittest.TextTestRunner(
|
|
verbosity=self.verbosity,
|
|
failfast=self.failfast,
|
|
).run(suite)
|
|
|
|
def teardown_databases(self, old_config, **kwargs):
|
|
"""
|
|
Destroys all the non-mirror databases.
|
|
"""
|
|
old_names, mirrors = old_config
|
|
for connection, old_name, destroy in old_names:
|
|
if destroy:
|
|
connection.creation.destroy_test_db(old_name, self.verbosity)
|
|
|
|
def teardown_test_environment(self, **kwargs):
|
|
unittest.removeHandler()
|
|
teardown_test_environment()
|
|
|
|
def suite_result(self, suite, result, **kwargs):
|
|
return len(result.failures) + len(result.errors)
|
|
|
|
def run_tests(self, test_labels, extra_tests=None, **kwargs):
|
|
"""
|
|
Run the unit tests for all the test labels in the provided list.
|
|
|
|
Test labels should be dotted Python paths to test modules, test
|
|
classes, or test methods.
|
|
|
|
A list of 'extra' tests may also be provided; these tests
|
|
will be added to the test suite.
|
|
|
|
Returns the number of tests that failed.
|
|
"""
|
|
self.setup_test_environment()
|
|
suite = self.build_suite(test_labels, extra_tests)
|
|
old_config = self.setup_databases()
|
|
result = self.run_suite(suite)
|
|
self.teardown_databases(old_config)
|
|
self.teardown_test_environment()
|
|
return self.suite_result(suite, result)
|
|
|
|
|
|
def dependency_ordered(test_databases, dependencies):
|
|
"""
|
|
Reorder test_databases into an order that honors the dependencies
|
|
described in TEST_DEPENDENCIES.
|
|
"""
|
|
ordered_test_databases = []
|
|
resolved_databases = set()
|
|
|
|
# Maps db signature to dependencies of all it's aliases
|
|
dependencies_map = {}
|
|
|
|
# sanity check - no DB can depend on it's own alias
|
|
for sig, (_, aliases) in test_databases:
|
|
all_deps = set()
|
|
for alias in aliases:
|
|
all_deps.update(dependencies.get(alias, []))
|
|
if not all_deps.isdisjoint(aliases):
|
|
raise ImproperlyConfigured(
|
|
"Circular dependency: databases %r depend on each other, "
|
|
"but are aliases." % aliases)
|
|
dependencies_map[sig] = all_deps
|
|
|
|
while test_databases:
|
|
changed = False
|
|
deferred = []
|
|
|
|
# Try to find a DB that has all it's dependencies met
|
|
for signature, (db_name, aliases) in test_databases:
|
|
if dependencies_map[signature].issubset(resolved_databases):
|
|
resolved_databases.update(aliases)
|
|
ordered_test_databases.append((signature, (db_name, aliases)))
|
|
changed = True
|
|
else:
|
|
deferred.append((signature, (db_name, aliases)))
|
|
|
|
if not changed:
|
|
raise ImproperlyConfigured(
|
|
"Circular dependency in TEST_DEPENDENCIES")
|
|
test_databases = deferred
|
|
return ordered_test_databases
|
|
|
|
|
|
def reorder_suite(suite, classes):
|
|
"""
|
|
Reorders a test suite by test type.
|
|
|
|
`classes` is a sequence of types
|
|
|
|
All tests of type classes[0] are placed first, then tests of type
|
|
classes[1], etc. Tests with no match in classes are placed last.
|
|
"""
|
|
class_count = len(classes)
|
|
bins = [unittest.TestSuite() for i in range(class_count+1)]
|
|
partition_suite(suite, classes, bins)
|
|
for i in range(class_count):
|
|
bins[0].addTests(bins[i+1])
|
|
return bins[0]
|
|
|
|
|
|
def partition_suite(suite, classes, bins):
|
|
"""
|
|
Partitions a test suite by test type.
|
|
|
|
classes is a sequence of types
|
|
bins is a sequence of TestSuites, one more than classes
|
|
|
|
Tests of type classes[i] are added to bins[i],
|
|
tests with no match found in classes are place in bins[-1]
|
|
"""
|
|
for test in suite:
|
|
if isinstance(test, unittest.TestSuite):
|
|
partition_suite(test, classes, bins)
|
|
else:
|
|
for i in range(len(classes)):
|
|
if isinstance(test, classes[i]):
|
|
bins[i].addTest(test)
|
|
break
|
|
else:
|
|
bins[-1].addTest(test)
|
|
|
|
|
|
def setup_databases(verbosity, interactive, **kwargs):
|
|
from django.db import connections, DEFAULT_DB_ALIAS
|
|
|
|
# First pass -- work out which databases actually need to be created,
|
|
# and which ones are test mirrors or duplicate entries in DATABASES
|
|
mirrored_aliases = {}
|
|
test_databases = {}
|
|
dependencies = {}
|
|
default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature()
|
|
for alias in connections:
|
|
connection = connections[alias]
|
|
if connection.settings_dict['TEST_MIRROR']:
|
|
# If the database is marked as a test mirror, save
|
|
# the alias.
|
|
mirrored_aliases[alias] = (
|
|
connection.settings_dict['TEST_MIRROR'])
|
|
else:
|
|
# Store a tuple with DB parameters that uniquely identify it.
|
|
# If we have two aliases with the same values for that tuple,
|
|
# we only need to create the test database once.
|
|
item = test_databases.setdefault(
|
|
connection.creation.test_db_signature(),
|
|
(connection.settings_dict['NAME'], set())
|
|
)
|
|
item[1].add(alias)
|
|
|
|
if 'TEST_DEPENDENCIES' in connection.settings_dict:
|
|
dependencies[alias] = (
|
|
connection.settings_dict['TEST_DEPENDENCIES'])
|
|
else:
|
|
if alias != DEFAULT_DB_ALIAS and connection.creation.test_db_signature() != default_sig:
|
|
dependencies[alias] = connection.settings_dict.get(
|
|
'TEST_DEPENDENCIES', [DEFAULT_DB_ALIAS])
|
|
|
|
# Second pass -- actually create the databases.
|
|
old_names = []
|
|
mirrors = []
|
|
|
|
for signature, (db_name, aliases) in dependency_ordered(
|
|
test_databases.items(), dependencies):
|
|
test_db_name = None
|
|
# Actually create the database for the first connection
|
|
for alias in aliases:
|
|
connection = connections[alias]
|
|
if test_db_name is None:
|
|
test_db_name = connection.creation.create_test_db(
|
|
verbosity, autoclobber=not interactive)
|
|
destroy = True
|
|
else:
|
|
connection.settings_dict['NAME'] = test_db_name
|
|
destroy = False
|
|
old_names.append((connection, db_name, destroy))
|
|
|
|
for alias, mirror_alias in mirrored_aliases.items():
|
|
mirrors.append((alias, connections[alias].settings_dict['NAME']))
|
|
connections[alias].settings_dict['NAME'] = (
|
|
connections[mirror_alias].settings_dict['NAME'])
|
|
|
|
return old_names, mirrors
|