130 lines
5.1 KiB
Python
130 lines
5.1 KiB
Python
import shutil
|
|
import sys
|
|
|
|
from django.apps import apps
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.core.management.utils import run_formatters
|
|
from django.db import migrations
|
|
from django.db.migrations.exceptions import AmbiguityError
|
|
from django.db.migrations.loader import MigrationLoader
|
|
from django.db.migrations.optimizer import MigrationOptimizer
|
|
from django.db.migrations.writer import MigrationWriter
|
|
from django.utils.version import get_docs_version
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Optimizes the operations for the named migration."
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"app_label",
|
|
help="App label of the application to optimize the migration for.",
|
|
)
|
|
parser.add_argument(
|
|
"migration_name", help="Migration name to optimize the operations for."
|
|
)
|
|
parser.add_argument(
|
|
"--check",
|
|
action="store_true",
|
|
help="Exit with a non-zero status if the migration can be optimized.",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
verbosity = options["verbosity"]
|
|
app_label = options["app_label"]
|
|
migration_name = options["migration_name"]
|
|
check = options["check"]
|
|
|
|
# Validate app_label.
|
|
try:
|
|
apps.get_app_config(app_label)
|
|
except LookupError as err:
|
|
raise CommandError(str(err))
|
|
|
|
# Load the current graph state.
|
|
loader = MigrationLoader(None)
|
|
if app_label not in loader.migrated_apps:
|
|
raise CommandError(f"App '{app_label}' does not have migrations.")
|
|
# Find a migration.
|
|
try:
|
|
migration = loader.get_migration_by_prefix(app_label, migration_name)
|
|
except AmbiguityError:
|
|
raise CommandError(
|
|
f"More than one migration matches '{migration_name}' in app "
|
|
f"'{app_label}'. Please be more specific."
|
|
)
|
|
except KeyError:
|
|
raise CommandError(
|
|
f"Cannot find a migration matching '{migration_name}' from app "
|
|
f"'{app_label}'."
|
|
)
|
|
|
|
# Optimize the migration.
|
|
optimizer = MigrationOptimizer()
|
|
new_operations = optimizer.optimize(migration.operations, migration.app_label)
|
|
if len(migration.operations) == len(new_operations):
|
|
if verbosity > 0:
|
|
self.stdout.write("No optimizations possible.")
|
|
return
|
|
else:
|
|
if verbosity > 0:
|
|
self.stdout.write(
|
|
"Optimizing from %d operations to %d operations."
|
|
% (len(migration.operations), len(new_operations))
|
|
)
|
|
if check:
|
|
sys.exit(1)
|
|
|
|
# Set the new migration optimizations.
|
|
migration.operations = new_operations
|
|
|
|
# Write out the optimized migration file.
|
|
writer = MigrationWriter(migration)
|
|
migration_file_string = writer.as_string()
|
|
if writer.needs_manual_porting:
|
|
if migration.replaces:
|
|
raise CommandError(
|
|
"Migration will require manual porting but is already a squashed "
|
|
"migration.\nTransition to a normal migration first: "
|
|
"https://docs.djangoproject.com/en/%s/topics/migrations/"
|
|
"#squashing-migrations" % get_docs_version()
|
|
)
|
|
# Make a new migration with those operations.
|
|
subclass = type(
|
|
"Migration",
|
|
(migrations.Migration,),
|
|
{
|
|
"dependencies": migration.dependencies,
|
|
"operations": new_operations,
|
|
"replaces": [(migration.app_label, migration.name)],
|
|
},
|
|
)
|
|
optimized_migration_name = "%s_optimized" % migration.name
|
|
optimized_migration = subclass(optimized_migration_name, app_label)
|
|
writer = MigrationWriter(optimized_migration)
|
|
migration_file_string = writer.as_string()
|
|
if verbosity > 0:
|
|
self.stdout.write(
|
|
self.style.MIGRATE_HEADING("Manual porting required") + "\n"
|
|
" Your migrations contained functions that must be manually "
|
|
"copied over,\n"
|
|
" as we could not safely copy their implementation.\n"
|
|
" See the comment at the top of the optimized migration for "
|
|
"details."
|
|
)
|
|
if shutil.which("black"):
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
"Optimized migration couldn't be formatted using the "
|
|
'"black" command. You can call it manually.'
|
|
)
|
|
)
|
|
with open(writer.path, "w", encoding="utf-8") as fh:
|
|
fh.write(migration_file_string)
|
|
run_formatters([writer.path])
|
|
|
|
if verbosity > 0:
|
|
self.stdout.write(
|
|
self.style.MIGRATE_HEADING(f"Optimized migration {writer.path}")
|
|
)
|