django1/django/core/management/commands/showmigrations.py

158 lines
6.2 KiB
Python

import sys
from django.apps import apps
from django.core.management.base import BaseCommand
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
class Command(BaseCommand):
help = "Shows all available migrations for the current project"
def add_arguments(self, parser):
parser.add_argument(
'app_label', nargs='*',
help='App labels of applications to limit the output to.',
)
parser.add_argument(
'--database', default=DEFAULT_DB_ALIAS,
help=(
'Nominates a database to show migrations for. Defaults to the '
'"default" database.'
),
)
formats = parser.add_mutually_exclusive_group()
formats.add_argument(
'--list', '-l', action='store_const', dest='format', const='list',
help=(
'Shows a list of all migrations and which are applied. '
'With a verbosity level of 2 or above, the applied datetimes '
'will be included.'
),
)
formats.add_argument(
'--plan', '-p', action='store_const', dest='format', const='plan',
help=(
'Shows all migrations in the order they will be applied. '
'With a verbosity level of 2 or above all direct migration dependencies '
'and reverse dependencies (run_before) will be included.'
)
)
parser.set_defaults(format='list')
def handle(self, *args, **options):
self.verbosity = options['verbosity']
# Get the database we're operating from
db = options['database']
connection = connections[db]
if options['format'] == "plan":
return self.show_plan(connection, options['app_label'])
else:
return self.show_list(connection, options['app_label'])
def _validate_app_names(self, loader, app_names):
has_bad_names = False
for app_name in app_names:
try:
apps.get_app_config(app_name)
except LookupError as err:
self.stderr.write(str(err))
has_bad_names = True
if has_bad_names:
sys.exit(2)
def show_list(self, connection, app_names=None):
"""
Show a list of all migrations on the system, or only those of
some named apps.
"""
# Load migrations from disk/DB
loader = MigrationLoader(connection, ignore_no_migrations=True)
recorder = MigrationRecorder(connection)
recorded_migrations = recorder.applied_migrations()
graph = loader.graph
# If we were passed a list of apps, validate it
if app_names:
self._validate_app_names(loader, app_names)
# Otherwise, show all apps in alphabetic order
else:
app_names = sorted(loader.migrated_apps)
# For each app, print its migrations in order from oldest (roots) to
# newest (leaves).
for app_name in app_names:
self.stdout.write(app_name, self.style.MIGRATE_LABEL)
shown = set()
for node in graph.leaf_nodes(app_name):
for plan_node in graph.forwards_plan(node):
if plan_node not in shown and plan_node[0] == app_name:
# Give it a nice title if it's a squashed one
title = plan_node[1]
if graph.nodes[plan_node].replaces:
title += " (%s squashed migrations)" % len(graph.nodes[plan_node].replaces)
applied_migration = loader.applied_migrations.get(plan_node)
# Mark it as applied/unapplied
if applied_migration:
if plan_node in recorded_migrations:
output = ' [X] %s' % title
else:
title += " Run 'manage.py migrate' to finish recording."
output = ' [-] %s' % title
if self.verbosity >= 2 and hasattr(applied_migration, 'applied'):
output += ' (applied at %s)' % applied_migration.applied.strftime('%Y-%m-%d %H:%M:%S')
self.stdout.write(output)
else:
self.stdout.write(" [ ] %s" % title)
shown.add(plan_node)
# If we didn't print anything, then a small message
if not shown:
self.stdout.write(" (no migrations)", self.style.ERROR)
def show_plan(self, connection, app_names=None):
"""
Show all known migrations (or only those of the specified app_names)
in the order they will be applied.
"""
# Load migrations from disk/DB
loader = MigrationLoader(connection)
graph = loader.graph
if app_names:
self._validate_app_names(loader, app_names)
targets = [key for key in graph.leaf_nodes() if key[0] in app_names]
else:
targets = graph.leaf_nodes()
plan = []
seen = set()
# Generate the plan
for target in targets:
for migration in graph.forwards_plan(target):
if migration not in seen:
node = graph.node_map[migration]
plan.append(node)
seen.add(migration)
# Output
def print_deps(node):
out = []
for parent in sorted(node.parents):
out.append("%s.%s" % parent.key)
if out:
return " ... (%s)" % ", ".join(out)
return ""
for node in plan:
deps = ""
if self.verbosity >= 2:
deps = print_deps(node)
if node.key in loader.applied_migrations:
self.stdout.write("[X] %s.%s%s" % (node.key[0], node.key[1], deps))
else:
self.stdout.write("[ ] %s.%s%s" % (node.key[0], node.key[1], deps))
if not plan:
self.stdout.write('(no migrations)', self.style.ERROR)