Fixed #27844 -- Added optimizemigration management command.

This commit is contained in:
David Wobrock 2022-01-01 23:33:08 +01:00 committed by Mariusz Felisiak
parent 847f46e9bf
commit 7c318a8bdd
6 changed files with 357 additions and 4 deletions

View File

@ -0,0 +1,121 @@
import sys
from django.apps import apps
from django.core.management.base import BaseCommand, CommandError
from django.core.management.utils import run_formatters
from django.db import migrations
from django.db.migrations.exceptions import AmbiguityError
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.optimizer import MigrationOptimizer
from django.db.migrations.writer import MigrationWriter
from django.utils.version import get_docs_version
class Command(BaseCommand):
help = "Optimizes the operations for the named migration."
def add_arguments(self, parser):
parser.add_argument(
"app_label",
help="App label of the application to optimize the migration for.",
)
parser.add_argument(
"migration_name", help="Migration name to optimize the operations for."
)
parser.add_argument(
"--check",
action="store_true",
help="Exit with a non-zero status if the migration can be optimized.",
)
def handle(self, *args, **options):
verbosity = options["verbosity"]
app_label = options["app_label"]
migration_name = options["migration_name"]
check = options["check"]
# Validate app_label.
try:
apps.get_app_config(app_label)
except LookupError as err:
raise CommandError(str(err))
# Load the current graph state.
loader = MigrationLoader(None)
if app_label not in loader.migrated_apps:
raise CommandError(f"App '{app_label}' does not have migrations.")
# Find a migration.
try:
migration = loader.get_migration_by_prefix(app_label, migration_name)
except AmbiguityError:
raise CommandError(
f"More than one migration matches '{migration_name}' in app "
f"'{app_label}'. Please be more specific."
)
except KeyError:
raise CommandError(
f"Cannot find a migration matching '{migration_name}' from app "
f"'{app_label}'."
)
# Optimize the migration.
optimizer = MigrationOptimizer()
new_operations = optimizer.optimize(migration.operations, migration.app_label)
if len(migration.operations) == len(new_operations):
if verbosity > 0:
self.stdout.write("No optimizations possible.")
return
else:
if verbosity > 0:
self.stdout.write(
"Optimizing from %d operations to %d operations."
% (len(migration.operations), len(new_operations))
)
if check:
sys.exit(1)
# Set the new migration optimizations.
migration.operations = new_operations
# Write out the optimized migration file.
writer = MigrationWriter(migration)
migration_file_string = writer.as_string()
if writer.needs_manual_porting:
if migration.replaces:
raise CommandError(
"Migration will require manual porting but is already a squashed "
"migration.\nTransition to a normal migration first: "
"https://docs.djangoproject.com/en/%s/topics/migrations/"
"#squashing-migrations" % get_docs_version()
)
# Make a new migration with those operations.
subclass = type(
"Migration",
(migrations.Migration,),
{
"dependencies": migration.dependencies,
"operations": new_operations,
"replaces": [(migration.app_label, migration.name)],
},
)
optimized_migration_name = "%s_optimized" % migration.name
optimized_migration = subclass(optimized_migration_name, app_label)
writer = MigrationWriter(optimized_migration)
migration_file_string = writer.as_string()
if verbosity > 0:
self.stdout.write(
self.style.MIGRATE_HEADING("Manual porting required") + "\n"
" Your migrations contained functions that must be manually "
"copied over,\n"
" as we could not safely copy their implementation.\n"
" See the comment at the top of the optimized migration for "
"details."
)
with open(writer.path, "w", encoding="utf-8") as fh:
fh.write(migration_file_string)
run_formatters([writer.path])
if verbosity > 0:
self.stdout.write(
self.style.MIGRATE_HEADING(f"Optimized migration {writer.path}")
)

View File

@ -916,6 +916,23 @@ Deletes nonexistent migrations from the ``django_migrations`` table. This is
useful when migration files replaced by a squashed migration have been removed.
See :ref:`migration-squashing` for more details.
``optimizemigration``
---------------------
.. versionadded:: 4.1
.. django-admin:: optimizemigration app_label migration_name
Optimizes the operations for the named migration and overrides the existing
file. If the migration contains functions that must be manually copied, the
command creates a new migration file suffixed with ``_optimized`` that is meant
to replace the named migration.
.. django-admin-option:: --check
Makes ``optimizemigration`` exit with a non-zero status when a migration can be
optimized.
``runserver``
-------------
@ -2056,8 +2073,9 @@ Black formatting
.. versionadded:: 4.1
The Python files created by :djadmin:`startproject`, :djadmin:`startapp`,
:djadmin:`makemigrations`, and :djadmin:`squashmigrations` are formatted using
the ``black`` command if it is present on your ``PATH``.
:djadmin:`optimizemigration`, :djadmin:`makemigrations`, and
:djadmin:`squashmigrations` are formatted using the ``black`` command if it is
present on your ``PATH``.
If you have ``black`` globally installed, but do not wish it used for the
current project, you can set the ``PATH`` explicitly::

View File

@ -239,8 +239,12 @@ Management Commands
migrations from the ``django_migrations`` table.
* Python files created by :djadmin:`startproject`, :djadmin:`startapp`,
:djadmin:`makemigrations`, and :djadmin:`squashmigrations` are now formatted
using the ``black`` command if it is present on your ``PATH``.
:djadmin:`optimizemigration`, :djadmin:`makemigrations`, and
:djadmin:`squashmigrations` are now formatted using the ``black`` command if
it is present on your ``PATH``.
* The new :djadmin:`optimizemigration` command allows optimizing operations for
a migration.
Migrations
~~~~~~~~~~

View File

@ -2810,3 +2810,163 @@ class AppLabelErrorTests(TestCase):
def test_squashmigrations_app_name_specified_as_label(self):
with self.assertRaisesMessage(CommandError, self.did_you_mean_auth_error):
call_command("squashmigrations", "django.contrib.auth", "0002")
def test_optimizemigration_nonexistent_app_label(self):
with self.assertRaisesMessage(CommandError, self.nonexistent_app_error):
call_command("optimizemigration", "nonexistent_app", "0002")
def test_optimizemigration_app_name_specified_as_label(self):
with self.assertRaisesMessage(CommandError, self.did_you_mean_auth_error):
call_command("optimizemigration", "django.contrib.auth", "0002")
class OptimizeMigrationTests(MigrationTestBase):
def test_no_optimization_possible(self):
out = io.StringIO()
with self.temporary_migration_module(
module="migrations.test_migrations"
) as migration_dir:
call_command(
"optimizemigration", "migrations", "0002", stdout=out, no_color=True
)
migration_file = os.path.join(migration_dir, "0002_second.py")
self.assertTrue(os.path.exists(migration_file))
call_command(
"optimizemigration",
"migrations",
"0002",
stdout=out,
no_color=True,
verbosity=0,
)
self.assertEqual(out.getvalue(), "No optimizations possible.\n")
def test_optimization(self):
out = io.StringIO()
with self.temporary_migration_module(
module="migrations.test_migrations"
) as migration_dir:
call_command(
"optimizemigration", "migrations", "0001", stdout=out, no_color=True
)
initial_migration_file = os.path.join(migration_dir, "0001_initial.py")
self.assertTrue(os.path.exists(initial_migration_file))
with open(initial_migration_file) as fp:
content = fp.read()
self.assertIn(
'("bool", models.BooleanField'
if HAS_BLACK
else "('bool', models.BooleanField",
content,
)
self.assertEqual(
out.getvalue(),
f"Optimizing from 4 operations to 2 operations.\n"
f"Optimized migration {initial_migration_file}\n",
)
def test_optimization_no_verbosity(self):
out = io.StringIO()
with self.temporary_migration_module(
module="migrations.test_migrations"
) as migration_dir:
call_command(
"optimizemigration",
"migrations",
"0001",
stdout=out,
no_color=True,
verbosity=0,
)
initial_migration_file = os.path.join(migration_dir, "0001_initial.py")
self.assertTrue(os.path.exists(initial_migration_file))
with open(initial_migration_file) as fp:
content = fp.read()
self.assertIn(
'("bool", models.BooleanField'
if HAS_BLACK
else "('bool', models.BooleanField",
content,
)
self.assertEqual(out.getvalue(), "")
def test_creates_replace_migration_manual_porting(self):
out = io.StringIO()
with self.temporary_migration_module(
module="migrations.test_migrations_manual_porting"
) as migration_dir:
call_command(
"optimizemigration", "migrations", "0003", stdout=out, no_color=True
)
optimized_migration_file = os.path.join(
migration_dir, "0003_third_optimized.py"
)
self.assertTrue(os.path.exists(optimized_migration_file))
with open(optimized_migration_file) as fp:
content = fp.read()
self.assertIn("replaces = [", content)
self.assertEqual(
out.getvalue(),
f"Optimizing from 3 operations to 2 operations.\n"
f"Manual porting required\n"
f" Your migrations contained functions that must be manually copied over,"
f"\n"
f" as we could not safely copy their implementation.\n"
f" See the comment at the top of the optimized migration for details.\n"
f"Optimized migration {optimized_migration_file}\n",
)
def test_fails_squash_migration_manual_porting(self):
out = io.StringIO()
with self.temporary_migration_module(
module="migrations.test_migrations_manual_porting"
) as migration_dir:
msg = (
"Migration will require manual porting but is already a squashed "
"migration.\nTransition to a normal migration first: "
"https://docs.djangoproject.com/en/dev/topics/migrations/"
"#squashing-migrations"
)
with self.assertRaisesMessage(CommandError, msg):
call_command("optimizemigration", "migrations", "0004", stdout=out)
optimized_migration_file = os.path.join(
migration_dir, "0004_fourth_optimized.py"
)
self.assertFalse(os.path.exists(optimized_migration_file))
self.assertEqual(
out.getvalue(), "Optimizing from 3 operations to 2 operations.\n"
)
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
def test_optimizemigration_check(self):
with self.assertRaises(SystemExit):
call_command(
"optimizemigration", "--check", "migrations", "0001", verbosity=0
)
call_command("optimizemigration", "--check", "migrations", "0002", verbosity=0)
@override_settings(
INSTALLED_APPS=["migrations.migrations_test_apps.unmigrated_app_simple"],
)
def test_app_without_migrations(self):
msg = "App 'unmigrated_app_simple' does not have migrations."
with self.assertRaisesMessage(CommandError, msg):
call_command("optimizemigration", "unmigrated_app_simple", "0001")
@override_settings(
MIGRATION_MODULES={"migrations": "migrations.test_migrations_clashing_prefix"},
)
def test_ambigious_prefix(self):
msg = (
"More than one migration matches 'a' in app 'migrations'. Please "
"be more specific."
)
with self.assertRaisesMessage(CommandError, msg):
call_command("optimizemigration", "migrations", "a")
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
def test_unknown_prefix(self):
msg = "Cannot find a migration matching 'nonexistent' from app 'migrations'."
with self.assertRaisesMessage(CommandError, msg):
call_command("optimizemigration", "migrations", "nonexistent")

View File

@ -0,0 +1,23 @@
from django.db import migrations
def forwards(apps, schema_editor):
pass
class Migration(migrations.Migration):
dependencies = [
("migrations", "0002_second"),
]
operations = [
migrations.AlterUniqueTogether(
name="somemodel",
unique_together={("id", "name")},
),
migrations.AlterUniqueTogether(
name="somemodel",
unique_together={("name",)},
),
migrations.RunPython(forwards, migrations.RunPython.noop),
]

View File

@ -0,0 +1,27 @@
from django.db import migrations
def forwards(apps, schema_editor):
pass
class Migration(migrations.Migration):
dependencies = [
("migrations", "0002_second"),
]
replaces = [
("migrations", "0003_third"),
]
operations = [
migrations.AlterUniqueTogether(
name="somemodel",
unique_together={("id", "name")},
),
migrations.AlterUniqueTogether(
name="somemodel",
unique_together={("name",)},
),
migrations.RunPython(forwards, migrations.RunPython.noop),
]