591 lines
22 KiB
Python
591 lines
22 KiB
Python
"""
|
|
Base classes for writing management commands (named commands which can
|
|
be executed through ``django-admin`` or ``manage.py``).
|
|
"""
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from argparse import ArgumentParser, HelpFormatter
|
|
from io import TextIOBase
|
|
|
|
import django
|
|
from django.core import checks
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.core.management.color import color_style, no_style
|
|
from django.db import DEFAULT_DB_ALIAS, connections
|
|
|
|
ALL_CHECKS = '__all__'
|
|
|
|
|
|
class CommandError(Exception):
|
|
"""
|
|
Exception class indicating a problem while executing a management
|
|
command.
|
|
|
|
If this exception is raised during the execution of a management
|
|
command, it will be caught and turned into a nicely-printed error
|
|
message to the appropriate output stream (i.e., stderr); as a
|
|
result, raising this exception (with a sensible description of the
|
|
error) is the preferred way to indicate that something has gone
|
|
wrong in the execution of a command.
|
|
"""
|
|
def __init__(self, *args, returncode=1, **kwargs):
|
|
self.returncode = returncode
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
class SystemCheckError(CommandError):
|
|
"""
|
|
The system check framework detected unrecoverable errors.
|
|
"""
|
|
pass
|
|
|
|
|
|
class CommandParser(ArgumentParser):
|
|
"""
|
|
Customized ArgumentParser class to improve some error messages and prevent
|
|
SystemExit in several occasions, as SystemExit is unacceptable when a
|
|
command is called programmatically.
|
|
"""
|
|
def __init__(self, *, missing_args_message=None, called_from_command_line=None, **kwargs):
|
|
self.missing_args_message = missing_args_message
|
|
self.called_from_command_line = called_from_command_line
|
|
super().__init__(**kwargs)
|
|
|
|
def parse_args(self, args=None, namespace=None):
|
|
# Catch missing argument for a better error message
|
|
if (self.missing_args_message and
|
|
not (args or any(not arg.startswith('-') for arg in args))):
|
|
self.error(self.missing_args_message)
|
|
return super().parse_args(args, namespace)
|
|
|
|
def error(self, message):
|
|
if self.called_from_command_line:
|
|
super().error(message)
|
|
else:
|
|
raise CommandError("Error: %s" % message)
|
|
|
|
|
|
def handle_default_options(options):
|
|
"""
|
|
Include any default options that all commands should accept here
|
|
so that ManagementUtility can handle them before searching for
|
|
user commands.
|
|
"""
|
|
if options.settings:
|
|
os.environ['DJANGO_SETTINGS_MODULE'] = options.settings
|
|
if options.pythonpath:
|
|
sys.path.insert(0, options.pythonpath)
|
|
|
|
|
|
def no_translations(handle_func):
|
|
"""Decorator that forces a command to run with translations deactivated."""
|
|
def wrapped(*args, **kwargs):
|
|
from django.utils import translation
|
|
saved_locale = translation.get_language()
|
|
translation.deactivate_all()
|
|
try:
|
|
res = handle_func(*args, **kwargs)
|
|
finally:
|
|
if saved_locale is not None:
|
|
translation.activate(saved_locale)
|
|
return res
|
|
return wrapped
|
|
|
|
|
|
class DjangoHelpFormatter(HelpFormatter):
|
|
"""
|
|
Customized formatter so that command-specific arguments appear in the
|
|
--help output before arguments common to all commands.
|
|
"""
|
|
show_last = {
|
|
'--version', '--verbosity', '--traceback', '--settings', '--pythonpath',
|
|
'--no-color', '--force-color', '--skip-checks',
|
|
}
|
|
|
|
def _reordered_actions(self, actions):
|
|
return sorted(
|
|
actions,
|
|
key=lambda a: set(a.option_strings) & self.show_last != set()
|
|
)
|
|
|
|
def add_usage(self, usage, actions, *args, **kwargs):
|
|
super().add_usage(usage, self._reordered_actions(actions), *args, **kwargs)
|
|
|
|
def add_arguments(self, actions):
|
|
super().add_arguments(self._reordered_actions(actions))
|
|
|
|
|
|
class OutputWrapper(TextIOBase):
|
|
"""
|
|
Wrapper around stdout/stderr
|
|
"""
|
|
@property
|
|
def style_func(self):
|
|
return self._style_func
|
|
|
|
@style_func.setter
|
|
def style_func(self, style_func):
|
|
if style_func and self.isatty():
|
|
self._style_func = style_func
|
|
else:
|
|
self._style_func = lambda x: x
|
|
|
|
def __init__(self, out, ending='\n'):
|
|
self._out = out
|
|
self.style_func = None
|
|
self.ending = ending
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._out, name)
|
|
|
|
def flush(self):
|
|
if hasattr(self._out, 'flush'):
|
|
self._out.flush()
|
|
|
|
def isatty(self):
|
|
return hasattr(self._out, 'isatty') and self._out.isatty()
|
|
|
|
def write(self, msg='', style_func=None, ending=None):
|
|
ending = self.ending if ending is None else ending
|
|
if ending and not msg.endswith(ending):
|
|
msg += ending
|
|
style_func = style_func or self.style_func
|
|
self._out.write(style_func(msg))
|
|
|
|
|
|
class BaseCommand:
|
|
"""
|
|
The base class from which all management commands ultimately
|
|
derive.
|
|
|
|
Use this class if you want access to all of the mechanisms which
|
|
parse the command-line arguments and work out what code to call in
|
|
response; if you don't need to change any of that behavior,
|
|
consider using one of the subclasses defined in this file.
|
|
|
|
If you are interested in overriding/customizing various aspects of
|
|
the command-parsing and -execution behavior, the normal flow works
|
|
as follows:
|
|
|
|
1. ``django-admin`` or ``manage.py`` loads the command class
|
|
and calls its ``run_from_argv()`` method.
|
|
|
|
2. The ``run_from_argv()`` method calls ``create_parser()`` to get
|
|
an ``ArgumentParser`` for the arguments, parses them, performs
|
|
any environment changes requested by options like
|
|
``pythonpath``, and then calls the ``execute()`` method,
|
|
passing the parsed arguments.
|
|
|
|
3. The ``execute()`` method attempts to carry out the command by
|
|
calling the ``handle()`` method with the parsed arguments; any
|
|
output produced by ``handle()`` will be printed to standard
|
|
output and, if the command is intended to produce a block of
|
|
SQL statements, will be wrapped in ``BEGIN`` and ``COMMIT``.
|
|
|
|
4. If ``handle()`` or ``execute()`` raised any exception (e.g.
|
|
``CommandError``), ``run_from_argv()`` will instead print an error
|
|
message to ``stderr``.
|
|
|
|
Thus, the ``handle()`` method is typically the starting point for
|
|
subclasses; many built-in commands and command types either place
|
|
all of their logic in ``handle()``, or perform some additional
|
|
parsing work in ``handle()`` and then delegate from it to more
|
|
specialized methods as needed.
|
|
|
|
Several attributes affect behavior at various steps along the way:
|
|
|
|
``help``
|
|
A short description of the command, which will be printed in
|
|
help messages.
|
|
|
|
``output_transaction``
|
|
A boolean indicating whether the command outputs SQL
|
|
statements; if ``True``, the output will automatically be
|
|
wrapped with ``BEGIN;`` and ``COMMIT;``. Default value is
|
|
``False``.
|
|
|
|
``requires_migrations_checks``
|
|
A boolean; if ``True``, the command prints a warning if the set of
|
|
migrations on disk don't match the migrations in the database.
|
|
|
|
``requires_system_checks``
|
|
A list or tuple of tags, e.g. [Tags.staticfiles, Tags.models]. System
|
|
checks registered in the chosen tags will be checked for errors prior
|
|
to executing the command. The value '__all__' can be used to specify
|
|
that all system checks should be performed. Default value is '__all__'.
|
|
|
|
To validate an individual application's models
|
|
rather than all applications' models, call
|
|
``self.check(app_configs)`` from ``handle()``, where ``app_configs``
|
|
is the list of application's configuration provided by the
|
|
app registry.
|
|
|
|
``stealth_options``
|
|
A tuple of any options the command uses which aren't defined by the
|
|
argument parser.
|
|
"""
|
|
# Metadata about this command.
|
|
help = ''
|
|
|
|
# Configuration shortcuts that alter various logic.
|
|
_called_from_command_line = False
|
|
output_transaction = False # Whether to wrap the output in a "BEGIN; COMMIT;"
|
|
requires_migrations_checks = False
|
|
requires_system_checks = '__all__'
|
|
# Arguments, common to all commands, which aren't defined by the argument
|
|
# parser.
|
|
base_stealth_options = ('stderr', 'stdout')
|
|
# Command-specific options not defined by the argument parser.
|
|
stealth_options = ()
|
|
suppressed_base_arguments = set()
|
|
|
|
def __init__(self, stdout=None, stderr=None, no_color=False, force_color=False):
|
|
self.stdout = OutputWrapper(stdout or sys.stdout)
|
|
self.stderr = OutputWrapper(stderr or sys.stderr)
|
|
if no_color and force_color:
|
|
raise CommandError("'no_color' and 'force_color' can't be used together.")
|
|
if no_color:
|
|
self.style = no_style()
|
|
else:
|
|
self.style = color_style(force_color)
|
|
self.stderr.style_func = self.style.ERROR
|
|
if (
|
|
not isinstance(self.requires_system_checks, (list, tuple)) and
|
|
self.requires_system_checks != ALL_CHECKS
|
|
):
|
|
raise TypeError('requires_system_checks must be a list or tuple.')
|
|
|
|
def get_version(self):
|
|
"""
|
|
Return the Django version, which should be correct for all built-in
|
|
Django commands. User-supplied commands can override this method to
|
|
return their own version.
|
|
"""
|
|
return django.get_version()
|
|
|
|
def create_parser(self, prog_name, subcommand, **kwargs):
|
|
"""
|
|
Create and return the ``ArgumentParser`` which will be used to
|
|
parse the arguments to this command.
|
|
"""
|
|
parser = CommandParser(
|
|
prog='%s %s' % (os.path.basename(prog_name), subcommand),
|
|
description=self.help or None,
|
|
formatter_class=DjangoHelpFormatter,
|
|
missing_args_message=getattr(self, 'missing_args_message', None),
|
|
called_from_command_line=getattr(self, '_called_from_command_line', None),
|
|
**kwargs
|
|
)
|
|
self.add_base_argument(
|
|
parser, '--version', action='version', version=self.get_version(),
|
|
help="Show program's version number and exit.",
|
|
)
|
|
self.add_base_argument(
|
|
parser, '-v', '--verbosity', default=1,
|
|
type=int, choices=[0, 1, 2, 3],
|
|
help='Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output',
|
|
)
|
|
self.add_base_argument(
|
|
parser, '--settings',
|
|
help=(
|
|
'The Python path to a settings module, e.g. '
|
|
'"myproject.settings.main". If this isn\'t provided, the '
|
|
'DJANGO_SETTINGS_MODULE environment variable will be used.'
|
|
),
|
|
)
|
|
self.add_base_argument(
|
|
parser, '--pythonpath',
|
|
help='A directory to add to the Python path, e.g. "/home/djangoprojects/myproject".',
|
|
)
|
|
self.add_base_argument(
|
|
parser, '--traceback', action='store_true',
|
|
help='Raise on CommandError exceptions.',
|
|
)
|
|
self.add_base_argument(
|
|
parser, '--no-color', action='store_true',
|
|
help="Don't colorize the command output.",
|
|
)
|
|
self.add_base_argument(
|
|
parser, '--force-color', action='store_true',
|
|
help='Force colorization of the command output.',
|
|
)
|
|
if self.requires_system_checks:
|
|
parser.add_argument(
|
|
'--skip-checks', action='store_true',
|
|
help='Skip system checks.',
|
|
)
|
|
self.add_arguments(parser)
|
|
return parser
|
|
|
|
def add_arguments(self, parser):
|
|
"""
|
|
Entry point for subclassed commands to add custom arguments.
|
|
"""
|
|
pass
|
|
|
|
def add_base_argument(self, parser, *args, **kwargs):
|
|
"""
|
|
Call the parser's add_argument() method, suppressing the help text
|
|
according to BaseCommand.suppressed_base_arguments.
|
|
"""
|
|
for arg in args:
|
|
if arg in self.suppressed_base_arguments:
|
|
kwargs['help'] = argparse.SUPPRESS
|
|
break
|
|
parser.add_argument(*args, **kwargs)
|
|
|
|
def print_help(self, prog_name, subcommand):
|
|
"""
|
|
Print the help message for this command, derived from
|
|
``self.usage()``.
|
|
"""
|
|
parser = self.create_parser(prog_name, subcommand)
|
|
parser.print_help()
|
|
|
|
def run_from_argv(self, argv):
|
|
"""
|
|
Set up any environment changes requested (e.g., Python path
|
|
and Django settings), then run this command. If the
|
|
command raises a ``CommandError``, intercept it and print it sensibly
|
|
to stderr. If the ``--traceback`` option is present or the raised
|
|
``Exception`` is not ``CommandError``, raise it.
|
|
"""
|
|
self._called_from_command_line = True
|
|
parser = self.create_parser(argv[0], argv[1])
|
|
|
|
options = parser.parse_args(argv[2:])
|
|
cmd_options = vars(options)
|
|
# Move positional args out of options to mimic legacy optparse
|
|
args = cmd_options.pop('args', ())
|
|
handle_default_options(options)
|
|
try:
|
|
self.execute(*args, **cmd_options)
|
|
except CommandError as e:
|
|
if options.traceback:
|
|
raise
|
|
|
|
# SystemCheckError takes care of its own formatting.
|
|
if isinstance(e, SystemCheckError):
|
|
self.stderr.write(str(e), lambda x: x)
|
|
else:
|
|
self.stderr.write('%s: %s' % (e.__class__.__name__, e))
|
|
sys.exit(e.returncode)
|
|
finally:
|
|
try:
|
|
connections.close_all()
|
|
except ImproperlyConfigured:
|
|
# Ignore if connections aren't setup at this point (e.g. no
|
|
# configured settings).
|
|
pass
|
|
|
|
def execute(self, *args, **options):
|
|
"""
|
|
Try to execute this command, performing system checks if needed (as
|
|
controlled by the ``requires_system_checks`` attribute, except if
|
|
force-skipped).
|
|
"""
|
|
if options['force_color'] and options['no_color']:
|
|
raise CommandError("The --no-color and --force-color options can't be used together.")
|
|
if options['force_color']:
|
|
self.style = color_style(force_color=True)
|
|
elif options['no_color']:
|
|
self.style = no_style()
|
|
self.stderr.style_func = None
|
|
if options.get('stdout'):
|
|
self.stdout = OutputWrapper(options['stdout'])
|
|
if options.get('stderr'):
|
|
self.stderr = OutputWrapper(options['stderr'])
|
|
|
|
if self.requires_system_checks and not options['skip_checks']:
|
|
if self.requires_system_checks == ALL_CHECKS:
|
|
self.check()
|
|
else:
|
|
self.check(tags=self.requires_system_checks)
|
|
if self.requires_migrations_checks:
|
|
self.check_migrations()
|
|
output = self.handle(*args, **options)
|
|
if output:
|
|
if self.output_transaction:
|
|
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
|
|
output = '%s\n%s\n%s' % (
|
|
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
|
|
output,
|
|
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
|
|
)
|
|
self.stdout.write(output)
|
|
return output
|
|
|
|
def check(self, app_configs=None, tags=None, display_num_errors=False,
|
|
include_deployment_checks=False, fail_level=checks.ERROR,
|
|
databases=None):
|
|
"""
|
|
Use the system check framework to validate entire Django project.
|
|
Raise CommandError for any serious message (error or critical errors).
|
|
If there are only light messages (like warnings), print them to stderr
|
|
and don't raise an exception.
|
|
"""
|
|
all_issues = checks.run_checks(
|
|
app_configs=app_configs,
|
|
tags=tags,
|
|
include_deployment_checks=include_deployment_checks,
|
|
databases=databases,
|
|
)
|
|
|
|
header, body, footer = "", "", ""
|
|
visible_issue_count = 0 # excludes silenced warnings
|
|
|
|
if all_issues:
|
|
debugs = [e for e in all_issues if e.level < checks.INFO and not e.is_silenced()]
|
|
infos = [e for e in all_issues if checks.INFO <= e.level < checks.WARNING and not e.is_silenced()]
|
|
warnings = [e for e in all_issues if checks.WARNING <= e.level < checks.ERROR and not e.is_silenced()]
|
|
errors = [e for e in all_issues if checks.ERROR <= e.level < checks.CRITICAL and not e.is_silenced()]
|
|
criticals = [e for e in all_issues if checks.CRITICAL <= e.level and not e.is_silenced()]
|
|
sorted_issues = [
|
|
(criticals, 'CRITICALS'),
|
|
(errors, 'ERRORS'),
|
|
(warnings, 'WARNINGS'),
|
|
(infos, 'INFOS'),
|
|
(debugs, 'DEBUGS'),
|
|
]
|
|
|
|
for issues, group_name in sorted_issues:
|
|
if issues:
|
|
visible_issue_count += len(issues)
|
|
formatted = (
|
|
self.style.ERROR(str(e))
|
|
if e.is_serious()
|
|
else self.style.WARNING(str(e))
|
|
for e in issues)
|
|
formatted = "\n".join(sorted(formatted))
|
|
body += '\n%s:\n%s\n' % (group_name, formatted)
|
|
|
|
if visible_issue_count:
|
|
header = "System check identified some issues:\n"
|
|
|
|
if display_num_errors:
|
|
if visible_issue_count:
|
|
footer += '\n'
|
|
footer += "System check identified %s (%s silenced)." % (
|
|
"no issues" if visible_issue_count == 0 else
|
|
"1 issue" if visible_issue_count == 1 else
|
|
"%s issues" % visible_issue_count,
|
|
len(all_issues) - visible_issue_count,
|
|
)
|
|
|
|
if any(e.is_serious(fail_level) and not e.is_silenced() for e in all_issues):
|
|
msg = self.style.ERROR("SystemCheckError: %s" % header) + body + footer
|
|
raise SystemCheckError(msg)
|
|
else:
|
|
msg = header + body + footer
|
|
|
|
if msg:
|
|
if visible_issue_count:
|
|
self.stderr.write(msg, lambda x: x)
|
|
else:
|
|
self.stdout.write(msg)
|
|
|
|
def check_migrations(self):
|
|
"""
|
|
Print a warning if the set of migrations on disk don't match the
|
|
migrations in the database.
|
|
"""
|
|
from django.db.migrations.executor import MigrationExecutor
|
|
try:
|
|
executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])
|
|
except ImproperlyConfigured:
|
|
# No databases are configured (or the dummy one)
|
|
return
|
|
|
|
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
|
|
if plan:
|
|
apps_waiting_migration = sorted({migration.app_label for migration, backwards in plan})
|
|
self.stdout.write(
|
|
self.style.NOTICE(
|
|
"\nYou have %(unapplied_migration_count)s unapplied migration(s). "
|
|
"Your project may not work properly until you apply the "
|
|
"migrations for app(s): %(apps_waiting_migration)s." % {
|
|
"unapplied_migration_count": len(plan),
|
|
"apps_waiting_migration": ", ".join(apps_waiting_migration),
|
|
}
|
|
)
|
|
)
|
|
self.stdout.write(self.style.NOTICE("Run 'python manage.py migrate' to apply them."))
|
|
|
|
def handle(self, *args, **options):
|
|
"""
|
|
The actual logic of the command. Subclasses must implement
|
|
this method.
|
|
"""
|
|
raise NotImplementedError('subclasses of BaseCommand must provide a handle() method')
|
|
|
|
|
|
class AppCommand(BaseCommand):
|
|
"""
|
|
A management command which takes one or more installed application labels
|
|
as arguments, and does something with each of them.
|
|
|
|
Rather than implementing ``handle()``, subclasses must implement
|
|
``handle_app_config()``, which will be called once for each application.
|
|
"""
|
|
missing_args_message = "Enter at least one application label."
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('args', metavar='app_label', nargs='+', help='One or more application label.')
|
|
|
|
def handle(self, *app_labels, **options):
|
|
from django.apps import apps
|
|
try:
|
|
app_configs = [apps.get_app_config(app_label) for app_label in app_labels]
|
|
except (LookupError, ImportError) as e:
|
|
raise CommandError("%s. Are you sure your INSTALLED_APPS setting is correct?" % e)
|
|
output = []
|
|
for app_config in app_configs:
|
|
app_output = self.handle_app_config(app_config, **options)
|
|
if app_output:
|
|
output.append(app_output)
|
|
return '\n'.join(output)
|
|
|
|
def handle_app_config(self, app_config, **options):
|
|
"""
|
|
Perform the command's actions for app_config, an AppConfig instance
|
|
corresponding to an application label given on the command line.
|
|
"""
|
|
raise NotImplementedError(
|
|
"Subclasses of AppCommand must provide"
|
|
"a handle_app_config() method.")
|
|
|
|
|
|
class LabelCommand(BaseCommand):
|
|
"""
|
|
A management command which takes one or more arbitrary arguments
|
|
(labels) on the command line, and does something with each of
|
|
them.
|
|
|
|
Rather than implementing ``handle()``, subclasses must implement
|
|
``handle_label()``, which will be called once for each label.
|
|
|
|
If the arguments should be names of installed applications, use
|
|
``AppCommand`` instead.
|
|
"""
|
|
label = 'label'
|
|
missing_args_message = "Enter at least one %s." % label
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('args', metavar=self.label, nargs='+')
|
|
|
|
def handle(self, *labels, **options):
|
|
output = []
|
|
for label in labels:
|
|
label_output = self.handle_label(label, **options)
|
|
if label_output:
|
|
output.append(label_output)
|
|
return '\n'.join(output)
|
|
|
|
def handle_label(self, label, **options):
|
|
"""
|
|
Perform the command's actions for ``label``, which will be the
|
|
string as given on the command line.
|
|
"""
|
|
raise NotImplementedError('subclasses of LabelCommand must provide a handle_label() method')
|