import os import sys import warnings from itertools import takewhile from django.apps import apps from django.conf import settings from django.core.management.base import ( BaseCommand, CommandError, no_translations, ) from django.db import DEFAULT_DB_ALIAS, OperationalError, connections, router from django.db.migrations import Migration from django.db.migrations.autodetector import MigrationAutodetector from django.db.migrations.loader import MigrationLoader from django.db.migrations.questioner import ( InteractiveMigrationQuestioner, MigrationQuestioner, NonInteractiveMigrationQuestioner, ) from django.db.migrations.state import ProjectState from django.db.migrations.utils import get_migration_name_timestamp from django.db.migrations.writer import MigrationWriter class Command(BaseCommand): help = "Creates new migration(s) for apps." def add_arguments(self, parser): parser.add_argument( 'args', metavar='app_label', nargs='*', help='Specify the app label(s) to create migrations for.', ) parser.add_argument( '--dry-run', action='store_true', help="Just show what migrations would be made; don't actually write them.", ) parser.add_argument( '--merge', action='store_true', help="Enable fixing of migration conflicts.", ) parser.add_argument( '--empty', action='store_true', help="Create an empty migration.", ) parser.add_argument( '--noinput', '--no-input', action='store_false', dest='interactive', help='Tells Django to NOT prompt the user for input of any kind.', ) parser.add_argument( '-n', '--name', help="Use this name for migration file(s).", ) parser.add_argument( '--no-header', action='store_false', dest='include_header', help='Do not add header comments to new migration file(s).', ) parser.add_argument( '--check', action='store_true', dest='check_changes', help='Exit with a non-zero status if model changes are missing migrations.', ) def log(self, msg): self.stdout.write(msg) @no_translations def handle(self, *app_labels, **options): self.verbosity = options['verbosity'] self.interactive = options['interactive'] self.dry_run = options['dry_run'] self.merge = options['merge'] self.empty = options['empty'] self.migration_name = options['name'] if self.migration_name and not self.migration_name.isidentifier(): raise CommandError('The migration name must be a valid Python identifier.') self.include_header = options['include_header'] check_changes = options['check_changes'] # Make sure the app they asked for exists app_labels = set(app_labels) has_bad_labels = False for app_label in app_labels: try: apps.get_app_config(app_label) except LookupError as err: self.stderr.write(str(err)) has_bad_labels = True if has_bad_labels: sys.exit(2) # Load the current graph state. Pass in None for the connection so # the loader doesn't try to resolve replaced migrations from DB. loader = MigrationLoader(None, ignore_no_migrations=True) # Raise an error if any migrations are applied before their dependencies. consistency_check_labels = {config.label for config in apps.get_app_configs()} # Non-default databases are only checked if database routers used. aliases_to_check = connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS] for alias in sorted(aliases_to_check): connection = connections[alias] if (connection.settings_dict['ENGINE'] != 'django.db.backends.dummy' and any( # At least one model must be migrated to the database. router.allow_migrate(connection.alias, app_label, model_name=model._meta.object_name) for app_label in consistency_check_labels for model in apps.get_app_config(app_label).get_models() )): try: loader.check_consistent_history(connection) except OperationalError as error: warnings.warn( "Got an error checking a consistent migration history " "performed for database connection '%s': %s" % (alias, error), RuntimeWarning, ) # Before anything else, see if there's conflicting apps and drop out # hard if there are any and they don't want to merge conflicts = loader.detect_conflicts() # If app_labels is specified, filter out conflicting migrations for unspecified apps if app_labels: conflicts = { app_label: conflict for app_label, conflict in conflicts.items() if app_label in app_labels } if conflicts and not self.merge: name_str = "; ".join( "%s in %s" % (", ".join(names), app) for app, names in conflicts.items() ) raise CommandError( "Conflicting migrations detected; multiple leaf nodes in the " "migration graph: (%s).\nTo fix them run " "'python manage.py makemigrations --merge'" % name_str ) # If they want to merge and there's nothing to merge, then politely exit if self.merge and not conflicts: self.log('No conflicts detected to merge.') return # If they want to merge and there is something to merge, then # divert into the merge code if self.merge and conflicts: return self.handle_merge(loader, conflicts) if self.interactive: questioner = InteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run) else: questioner = NonInteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run) # Set up autodetector autodetector = MigrationAutodetector( loader.project_state(), ProjectState.from_apps(apps), questioner, ) # If they want to make an empty migration, make one for each app if self.empty: if not app_labels: raise CommandError("You must supply at least one app label when using --empty.") # Make a fake changes() result we can pass to arrange_for_graph changes = { app: [Migration("custom", app)] for app in app_labels } changes = autodetector.arrange_for_graph( changes=changes, graph=loader.graph, migration_name=self.migration_name, ) self.write_migration_files(changes) return # Detect changes changes = autodetector.changes( graph=loader.graph, trim_to_apps=app_labels or None, convert_apps=app_labels or None, migration_name=self.migration_name, ) if not changes: # No changes? Tell them. if self.verbosity >= 1: if app_labels: if len(app_labels) == 1: self.log("No changes detected in app '%s'" % app_labels.pop()) else: self.log("No changes detected in apps '%s'" % ("', '".join(app_labels))) else: self.log('No changes detected') else: self.write_migration_files(changes) if check_changes: sys.exit(1) def write_migration_files(self, changes): """ Take a changes dict and write them out as migration files. """ directory_created = {} for app_label, app_migrations in changes.items(): if self.verbosity >= 1: self.log(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label)) for migration in app_migrations: # Describe the migration writer = MigrationWriter(migration, self.include_header) if self.verbosity >= 1: # Display a relative path if it's below the current working # directory, or an absolute path otherwise. try: migration_string = os.path.relpath(writer.path) except ValueError: migration_string = writer.path if migration_string.startswith('..'): migration_string = writer.path self.log(' %s\n' % self.style.MIGRATE_LABEL(migration_string)) for operation in migration.operations: self.log(' - %s' % operation.describe()) if not self.dry_run: # Write the migrations file to the disk. migrations_directory = os.path.dirname(writer.path) if not directory_created.get(app_label): os.makedirs(migrations_directory, exist_ok=True) init_path = os.path.join(migrations_directory, "__init__.py") if not os.path.isfile(init_path): open(init_path, "w").close() # We just do this once per app directory_created[app_label] = True migration_string = writer.as_string() with open(writer.path, "w", encoding='utf-8') as fh: fh.write(migration_string) elif self.verbosity == 3: # Alternatively, makemigrations --dry-run --verbosity 3 # will log the migrations rather than saving the file to # the disk. self.log(self.style.MIGRATE_HEADING( "Full migrations file '%s':" % writer.filename )) self.log(writer.as_string()) def handle_merge(self, loader, conflicts): """ Handles merging together conflicted migrations interactively, if it's safe; otherwise, advises on how to fix it. """ if self.interactive: questioner = InteractiveMigrationQuestioner() else: questioner = MigrationQuestioner(defaults={'ask_merge': True}) for app_label, migration_names in conflicts.items(): # Grab out the migrations in question, and work out their # common ancestor. merge_migrations = [] for migration_name in migration_names: migration = loader.get_migration(app_label, migration_name) migration.ancestry = [ mig for mig in loader.graph.forwards_plan((app_label, migration_name)) if mig[0] == migration.app_label ] merge_migrations.append(migration) def all_items_equal(seq): return all(item == seq[0] for item in seq[1:]) merge_migrations_generations = zip(*(m.ancestry for m in merge_migrations)) common_ancestor_count = sum(1 for common_ancestor_generation in takewhile(all_items_equal, merge_migrations_generations)) if not common_ancestor_count: raise ValueError("Could not find common ancestor of %s" % migration_names) # Now work out the operations along each divergent branch for migration in merge_migrations: migration.branch = migration.ancestry[common_ancestor_count:] migrations_ops = (loader.get_migration(node_app, node_name).operations for node_app, node_name in migration.branch) migration.merged_operations = sum(migrations_ops, []) # In future, this could use some of the Optimizer code # (can_optimize_through) to automatically see if they're # mergeable. For now, we always just prompt the user. if self.verbosity > 0: self.log(self.style.MIGRATE_HEADING('Merging %s' % app_label)) for migration in merge_migrations: self.log(self.style.MIGRATE_LABEL(' Branch %s' % migration.name)) for operation in migration.merged_operations: self.log(' - %s' % operation.describe()) if questioner.ask_merge(app_label): # If they still want to merge it, then write out an empty # file depending on the migrations needing merging. numbers = [ MigrationAutodetector.parse_number(migration.name) for migration in merge_migrations ] try: biggest_number = max(x for x in numbers if x is not None) except ValueError: biggest_number = 1 subclass = type("Migration", (Migration,), { "dependencies": [(app_label, migration.name) for migration in merge_migrations], }) parts = ['%04i' % (biggest_number + 1)] if self.migration_name: parts.append(self.migration_name) else: parts.append('merge') leaf_names = '_'.join(sorted(migration.name for migration in merge_migrations)) if len(leaf_names) > 47: parts.append(get_migration_name_timestamp()) else: parts.append(leaf_names) migration_name = '_'.join(parts) new_migration = subclass(migration_name, app_label) writer = MigrationWriter(new_migration, self.include_header) if not self.dry_run: # Write the merge migrations file to the disk with open(writer.path, "w", encoding='utf-8') as fh: fh.write(writer.as_string()) if self.verbosity > 0: self.log('\nCreated new merge migration %s' % writer.path) elif self.verbosity == 3: # Alternatively, makemigrations --merge --dry-run --verbosity 3 # will log the merge migrations rather than saving the file # to the disk. self.log(self.style.MIGRATE_HEADING( "Full merge migrations file '%s':" % writer.filename )) self.log(writer.as_string())