#!/usr/bin/env python
import argparse
import atexit
import copy
import os
import shutil
import socket
import subprocess
import sys
import tempfile
import warnings

try:
    import django
except ImportError as e:
    raise RuntimeError(
        'Django module not found, reference tests/README.rst for instructions.'
    ) from e
else:
    from django.apps import apps
    from django.conf import settings
    from django.db import connection, connections
    from django.test import TestCase, TransactionTestCase
    from django.test.runner import default_test_processes
    from django.test.selenium import SeleniumTestCaseBase
    from django.test.utils import NullTimeKeeper, TimeKeeper, get_runner
    from django.utils.deprecation import (
        RemovedInDjango40Warning, RemovedInDjango41Warning,
    )
    from django.utils.log import DEFAULT_LOGGING
    from django.utils.version import PY37

try:
    import MySQLdb
except ImportError:
    pass
else:
    # Ignore informational warnings from QuerySet.explain().
    warnings.filterwarnings('ignore', r'\(1003, *', category=MySQLdb.Warning)

# Make deprecation warnings errors to ensure no usage of deprecated features.
warnings.simplefilter("error", RemovedInDjango40Warning)
warnings.simplefilter('error', RemovedInDjango41Warning)
# Make resource and runtime warning errors to ensure no usage of error prone
# patterns.
warnings.simplefilter("error", ResourceWarning)
warnings.simplefilter("error", RuntimeWarning)
# Ignore known warnings in test dependencies.
warnings.filterwarnings("ignore", "'U' mode is deprecated", DeprecationWarning, module='docutils.io')

RUNTESTS_DIR = os.path.abspath(os.path.dirname(__file__))

TEMPLATE_DIR = os.path.join(RUNTESTS_DIR, 'templates')

# Create a specific subdirectory for the duration of the test suite.
TMPDIR = tempfile.mkdtemp(prefix='django_')
# Set the TMPDIR environment variable in addition to tempfile.tempdir
# so that children processes inherit it.
tempfile.tempdir = os.environ['TMPDIR'] = TMPDIR

# Removing the temporary TMPDIR.
atexit.register(shutil.rmtree, TMPDIR)


SUBDIRS_TO_SKIP = [
    'data',
    'import_error_package',
    'test_runner_apps',
]

ALWAYS_INSTALLED_APPS = [
    'django.contrib.contenttypes',
    'django.contrib.auth',
    'django.contrib.sites',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.admin.apps.SimpleAdminConfig',
    'django.contrib.staticfiles',
]

ALWAYS_MIDDLEWARE = [
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
]

# Need to add the associated contrib app to INSTALLED_APPS in some cases to
# avoid "RuntimeError: Model class X doesn't declare an explicit app_label
# and isn't in an application in INSTALLED_APPS."
CONTRIB_TESTS_TO_APPS = {
    'deprecation': ['django.contrib.flatpages', 'django.contrib.redirects'],
    'flatpages_tests': ['django.contrib.flatpages'],
    'redirects_tests': ['django.contrib.redirects'],
}


def get_test_modules():
    modules = []
    discovery_paths = [(None, RUNTESTS_DIR)]
    if connection.features.gis_enabled:
        # GIS tests are in nested apps
        discovery_paths.append(('gis_tests', os.path.join(RUNTESTS_DIR, 'gis_tests')))
    else:
        SUBDIRS_TO_SKIP.append('gis_tests')

    for modpath, dirpath in discovery_paths:
        for f in os.scandir(dirpath):
            if ('.' not in f.name and
                    os.path.basename(f.name) not in SUBDIRS_TO_SKIP and
                    not f.is_file() and
                    os.path.exists(os.path.join(f.path, '__init__.py'))):
                modules.append((modpath, f.name))
    return modules


def get_installed():
    return [app_config.name for app_config in apps.get_app_configs()]


def setup(verbosity, test_labels, parallel, start_at, start_after):
    # Reduce the given test labels to just the app module path.
    test_labels_set = set()
    for label in test_labels:
        bits = label.split('.')[:1]
        test_labels_set.add('.'.join(bits))

    if verbosity >= 1:
        msg = "Testing against Django installed in '%s'" % os.path.dirname(django.__file__)
        max_parallel = default_test_processes() if parallel == 0 else parallel
        if max_parallel > 1:
            msg += " with up to %d processes" % max_parallel
        print(msg)

    # Force declaring available_apps in TransactionTestCase for faster tests.
    def no_available_apps(self):
        raise Exception("Please define available_apps in TransactionTestCase "
                        "and its subclasses.")
    TransactionTestCase.available_apps = property(no_available_apps)
    TestCase.available_apps = None

    state = {
        'INSTALLED_APPS': settings.INSTALLED_APPS,
        'ROOT_URLCONF': getattr(settings, "ROOT_URLCONF", ""),
        'TEMPLATES': settings.TEMPLATES,
        'LANGUAGE_CODE': settings.LANGUAGE_CODE,
        'STATIC_URL': settings.STATIC_URL,
        'STATIC_ROOT': settings.STATIC_ROOT,
        'MIDDLEWARE': settings.MIDDLEWARE,
    }

    # Redirect some settings for the duration of these tests.
    settings.INSTALLED_APPS = ALWAYS_INSTALLED_APPS
    settings.ROOT_URLCONF = 'urls'
    settings.STATIC_URL = '/static/'
    settings.STATIC_ROOT = os.path.join(TMPDIR, 'static')
    settings.TEMPLATES = [{
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [TEMPLATE_DIR],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    }]
    settings.LANGUAGE_CODE = 'en'
    settings.SITE_ID = 1
    settings.MIDDLEWARE = ALWAYS_MIDDLEWARE
    settings.MIGRATION_MODULES = {
        # This lets us skip creating migrations for the test models as many of
        # them depend on one of the following contrib applications.
        'auth': None,
        'contenttypes': None,
        'sessions': None,
    }
    log_config = copy.deepcopy(DEFAULT_LOGGING)
    # Filter out non-error logging so we don't have to capture it in lots of
    # tests.
    log_config['loggers']['django']['level'] = 'ERROR'
    settings.LOGGING = log_config
    settings.SILENCED_SYSTEM_CHECKS = [
        'fields.W342',  # ForeignKey(unique=True) -> OneToOneField
        'fields.W903',  # NullBooleanField deprecated.
    ]

    # Load all the ALWAYS_INSTALLED_APPS.
    django.setup()

    # It would be nice to put this validation earlier but it must come after
    # django.setup() so that connection.features.gis_enabled can be accessed
    # without raising AppRegistryNotReady when running gis_tests in isolation
    # on some backends (e.g. PostGIS).
    if 'gis_tests' in test_labels_set and not connection.features.gis_enabled:
        print('Aborting: A GIS database backend is required to run gis_tests.')
        sys.exit(1)

    def _module_match_label(module_label, label):
        # Exact or ancestor match.
        return module_label == label or module_label.startswith(label + '.')

    # Load all the test model apps.
    test_modules = get_test_modules()

    found_start = not (start_at or start_after)
    installed_app_names = set(get_installed())
    for modpath, module_name in test_modules:
        if modpath:
            module_label = modpath + '.' + module_name
        else:
            module_label = module_name
        if not found_start:
            if start_at and _module_match_label(module_label, start_at):
                found_start = True
            elif start_after and _module_match_label(module_label, start_after):
                found_start = True
                continue
            else:
                continue
        # if the module (or an ancestor) was named on the command line, or
        # no modules were named (i.e., run all), import
        # this module and add it to INSTALLED_APPS.
        module_found_in_labels = not test_labels or any(
            _module_match_label(module_label, label) for label in test_labels_set
        )

        if module_name in CONTRIB_TESTS_TO_APPS and module_found_in_labels:
            for contrib_app in CONTRIB_TESTS_TO_APPS[module_name]:
                if contrib_app not in settings.INSTALLED_APPS:
                    settings.INSTALLED_APPS.append(contrib_app)

        if module_found_in_labels and module_label not in installed_app_names:
            if verbosity >= 2:
                print("Importing application %s" % module_name)
            settings.INSTALLED_APPS.append(module_label)

    # Add contrib.gis to INSTALLED_APPS if needed (rather than requiring
    # @override_settings(INSTALLED_APPS=...) on all test cases.
    gis = 'django.contrib.gis'
    if connection.features.gis_enabled and gis not in settings.INSTALLED_APPS:
        if verbosity >= 2:
            print("Importing application %s" % gis)
        settings.INSTALLED_APPS.append(gis)

    apps.set_installed_apps(settings.INSTALLED_APPS)

    return state


def teardown(state):
    # Restore the old settings.
    for key, value in state.items():
        setattr(settings, key, value)
    # Discard the multiprocessing.util finalizer that tries to remove a
    # temporary directory that's already removed by this script's
    # atexit.register(shutil.rmtree, TMPDIR) handler. Prevents
    # FileNotFoundError at the end of a test run (#27890).
    from multiprocessing.util import _finalizer_registry
    _finalizer_registry.pop((-100, 0), None)


def actual_test_processes(parallel):
    if parallel == 0:
        # This doesn't work before django.setup() on some databases.
        if all(conn.features.can_clone_databases for conn in connections.all()):
            return default_test_processes()
        else:
            return 1
    else:
        return parallel


class ActionSelenium(argparse.Action):
    """
    Validate the comma-separated list of requested browsers.
    """
    def __call__(self, parser, namespace, values, option_string=None):
        browsers = values.split(',')
        for browser in browsers:
            try:
                SeleniumTestCaseBase.import_webdriver(browser)
            except ImportError:
                raise argparse.ArgumentError(self, "Selenium browser specification '%s' is not valid." % browser)
        setattr(namespace, self.dest, browsers)


def django_tests(verbosity, interactive, failfast, keepdb, reverse,
                 test_labels, debug_sql, parallel, tags, exclude_tags,
                 test_name_patterns, start_at, start_after, pdb, buffer,
                 timing):
    state = setup(verbosity, test_labels, parallel, start_at, start_after)
    extra_tests = []

    # Run the test suite, including the extra validation tests.
    if not hasattr(settings, 'TEST_RUNNER'):
        settings.TEST_RUNNER = 'django.test.runner.DiscoverRunner'
    TestRunner = get_runner(settings)

    test_runner = TestRunner(
        verbosity=verbosity,
        interactive=interactive,
        failfast=failfast,
        keepdb=keepdb,
        reverse=reverse,
        debug_sql=debug_sql,
        parallel=actual_test_processes(parallel),
        tags=tags,
        exclude_tags=exclude_tags,
        test_name_patterns=test_name_patterns,
        pdb=pdb,
        buffer=buffer,
        timing=timing,
    )
    failures = test_runner.run_tests(
        test_labels or get_installed(),
        extra_tests=extra_tests,
    )
    teardown(state)
    return failures


def get_subprocess_args(options):
    subprocess_args = [
        sys.executable, __file__, '--settings=%s' % options.settings
    ]
    if options.failfast:
        subprocess_args.append('--failfast')
    if options.verbosity:
        subprocess_args.append('--verbosity=%s' % options.verbosity)
    if not options.interactive:
        subprocess_args.append('--noinput')
    if options.tags:
        subprocess_args.append('--tag=%s' % options.tags)
    if options.exclude_tags:
        subprocess_args.append('--exclude_tag=%s' % options.exclude_tags)
    return subprocess_args


def bisect_tests(bisection_label, options, test_labels, parallel, start_at, start_after):
    state = setup(options.verbosity, test_labels, parallel, start_at, start_after)

    test_labels = test_labels or get_installed()

    print('***** Bisecting test suite: %s' % ' '.join(test_labels))

    # Make sure the bisection point isn't in the test list
    # Also remove tests that need to be run in specific combinations
    for label in [bisection_label, 'model_inheritance_same_model_name']:
        try:
            test_labels.remove(label)
        except ValueError:
            pass

    subprocess_args = get_subprocess_args(options)

    iteration = 1
    while len(test_labels) > 1:
        midpoint = len(test_labels) // 2
        test_labels_a = test_labels[:midpoint] + [bisection_label]
        test_labels_b = test_labels[midpoint:] + [bisection_label]
        print('***** Pass %da: Running the first half of the test suite' % iteration)
        print('***** Test labels: %s' % ' '.join(test_labels_a))
        failures_a = subprocess.run(subprocess_args + test_labels_a)

        print('***** Pass %db: Running the second half of the test suite' % iteration)
        print('***** Test labels: %s' % ' '.join(test_labels_b))
        print('')
        failures_b = subprocess.run(subprocess_args + test_labels_b)

        if failures_a.returncode and not failures_b.returncode:
            print("***** Problem found in first half. Bisecting again...")
            iteration += 1
            test_labels = test_labels_a[:-1]
        elif failures_b.returncode and not failures_a.returncode:
            print("***** Problem found in second half. Bisecting again...")
            iteration += 1
            test_labels = test_labels_b[:-1]
        elif failures_a.returncode and failures_b.returncode:
            print("***** Multiple sources of failure found")
            break
        else:
            print("***** No source of failure found... try pair execution (--pair)")
            break

    if len(test_labels) == 1:
        print("***** Source of error: %s" % test_labels[0])
    teardown(state)


def paired_tests(paired_test, options, test_labels, parallel, start_at, start_after):
    state = setup(options.verbosity, test_labels, parallel, start_at, start_after)

    test_labels = test_labels or get_installed()

    print('***** Trying paired execution')

    # Make sure the constant member of the pair isn't in the test list
    # Also remove tests that need to be run in specific combinations
    for label in [paired_test, 'model_inheritance_same_model_name']:
        try:
            test_labels.remove(label)
        except ValueError:
            pass

    subprocess_args = get_subprocess_args(options)

    for i, label in enumerate(test_labels):
        print('***** %d of %d: Check test pairing with %s' % (
              i + 1, len(test_labels), label))
        failures = subprocess.call(subprocess_args + [label, paired_test])
        if failures:
            print('***** Found problem pair with %s' % label)
            return

    print('***** No problem pair found')
    teardown(state)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run the Django test suite.")
    parser.add_argument(
        'modules', nargs='*', metavar='module',
        help='Optional path(s) to test modules; e.g. "i18n" or '
             '"i18n.tests.TranslationTests.test_lazy_objects".',
    )
    parser.add_argument(
        '-v', '--verbosity', default=1, type=int, choices=[0, 1, 2, 3],
        help='Verbosity level; 0=minimal output, 1=normal output, 2=all output',
    )
    parser.add_argument(
        '--noinput', action='store_false', dest='interactive',
        help='Tells Django to NOT prompt the user for input of any kind.',
    )
    parser.add_argument(
        '--failfast', action='store_true',
        help='Tells Django to stop running the test suite after first failed test.',
    )
    parser.add_argument(
        '--keepdb', action='store_true',
        help='Tells Django to preserve the test database between runs.',
    )
    parser.add_argument(
        '--settings',
        help='Python path to settings module, e.g. "myproject.settings". If '
             'this isn\'t provided, either the DJANGO_SETTINGS_MODULE '
             'environment variable or "test_sqlite" will be used.',
    )
    parser.add_argument(
        '--bisect',
        help='Bisect the test suite to discover a test that causes a test '
             'failure when combined with the named test.',
    )
    parser.add_argument(
        '--pair',
        help='Run the test suite in pairs with the named test to find problem pairs.',
    )
    parser.add_argument(
        '--reverse', action='store_true',
        help='Sort test suites and test cases in opposite order to debug '
             'test side effects not apparent with normal execution lineup.',
    )
    parser.add_argument(
        '--selenium', action=ActionSelenium, metavar='BROWSERS',
        help='A comma-separated list of browsers to run the Selenium tests against.',
    )
    parser.add_argument(
        '--headless', action='store_true',
        help='Run selenium tests in headless mode, if the browser supports the option.',
    )
    parser.add_argument(
        '--selenium-hub',
        help='A URL for a selenium hub instance to use in combination with --selenium.',
    )
    parser.add_argument(
        '--external-host', default=socket.gethostname(),
        help='The external host that can be reached by the selenium hub instance when running Selenium '
             'tests via Selenium Hub.',
    )
    parser.add_argument(
        '--debug-sql', action='store_true',
        help='Turn on the SQL query logger within tests.',
    )
    parser.add_argument(
        '--parallel', nargs='?', default=0, type=int,
        const=default_test_processes(), metavar='N',
        help='Run tests using up to N parallel processes.',
    )
    parser.add_argument(
        '--tag', dest='tags', action='append',
        help='Run only tests with the specified tags. Can be used multiple times.',
    )
    parser.add_argument(
        '--exclude-tag', dest='exclude_tags', action='append',
        help='Do not run tests with the specified tag. Can be used multiple times.',
    )
    parser.add_argument(
        '--start-after', dest='start_after',
        help='Run tests starting after the specified top-level module.',
    )
    parser.add_argument(
        '--start-at', dest='start_at',
        help='Run tests starting at the specified top-level module.',
    )
    parser.add_argument(
        '--pdb', action='store_true',
        help='Runs the PDB debugger on error or failure.'
    )
    parser.add_argument(
        '-b', '--buffer', action='store_true',
        help='Discard output of passing tests.',
    )
    parser.add_argument(
        '--timing', action='store_true',
        help='Output timings, including database set up and total run time.',
    )
    if PY37:
        parser.add_argument(
            '-k', dest='test_name_patterns', action='append',
            help=(
                'Only run test methods and classes matching test name pattern. '
                'Same as unittest -k option. Can be used multiple times.'
            ),
        )

    options = parser.parse_args()

    using_selenium_hub = options.selenium and options.selenium_hub
    if options.selenium_hub and not options.selenium:
        parser.error('--selenium-hub and --external-host require --selenium to be used.')
    if using_selenium_hub and not options.external_host:
        parser.error('--selenium-hub and --external-host must be used together.')

    # Allow including a trailing slash on app_labels for tab completion convenience
    options.modules = [os.path.normpath(labels) for labels in options.modules]

    mutually_exclusive_options = [options.start_at, options.start_after, options.modules]
    enabled_module_options = [bool(option) for option in mutually_exclusive_options].count(True)
    if enabled_module_options > 1:
        print('Aborting: --start-at, --start-after, and test labels are mutually exclusive.')
        sys.exit(1)
    for opt_name in ['start_at', 'start_after']:
        opt_val = getattr(options, opt_name)
        if opt_val:
            if '.' in opt_val:
                print('Aborting: --%s must be a top-level module.' % opt_name.replace('_', '-'))
                sys.exit(1)
            setattr(options, opt_name, os.path.normpath(opt_val))
    if options.settings:
        os.environ['DJANGO_SETTINGS_MODULE'] = options.settings
    else:
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_sqlite')
        options.settings = os.environ['DJANGO_SETTINGS_MODULE']

    if options.selenium:
        if not options.tags:
            options.tags = ['selenium']
        elif 'selenium' not in options.tags:
            options.tags.append('selenium')
        if options.selenium_hub:
            SeleniumTestCaseBase.selenium_hub = options.selenium_hub
            SeleniumTestCaseBase.external_host = options.external_host
        SeleniumTestCaseBase.headless = options.headless
        SeleniumTestCaseBase.browsers = options.selenium

    if options.bisect:
        bisect_tests(
            options.bisect, options, options.modules, options.parallel,
            options.start_at, options.start_after,
        )
    elif options.pair:
        paired_tests(
            options.pair, options, options.modules, options.parallel,
            options.start_at, options.start_after,
        )
    else:
        time_keeper = TimeKeeper() if options.timing else NullTimeKeeper()
        with time_keeper.timed('Total run'):
            failures = django_tests(
                options.verbosity, options.interactive, options.failfast,
                options.keepdb, options.reverse, options.modules,
                options.debug_sql, options.parallel, options.tags,
                options.exclude_tags,
                getattr(options, 'test_name_patterns', None),
                options.start_at, options.start_after, options.pdb, options.buffer,
                options.timing,
            )
        time_keeper.print_results()
        if failures:
            sys.exit(1)