Fixed #24870 -- Added --update option to makemigrations command.

This commit is contained in:
David Wobrock 2022-06-14 20:12:20 +02:00 committed by Mariusz Felisiak
parent 3893fcdd94
commit e286ce17ff
4 changed files with 217 additions and 2 deletions

View File

@ -11,6 +11,8 @@ from django.db import DEFAULT_DB_ALIAS, OperationalError, connections, router
from django.db.migrations import Migration from django.db.migrations import Migration
from django.db.migrations.autodetector import MigrationAutodetector from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.loader import MigrationLoader from django.db.migrations.loader import MigrationLoader
from django.db.migrations.migration import SwappableTuple
from django.db.migrations.optimizer import MigrationOptimizer
from django.db.migrations.questioner import ( from django.db.migrations.questioner import (
InteractiveMigrationQuestioner, InteractiveMigrationQuestioner,
MigrationQuestioner, MigrationQuestioner,
@ -79,6 +81,15 @@ class Command(BaseCommand):
"paths of generated migration files to stdout." "paths of generated migration files to stdout."
), ),
) )
parser.add_argument(
"--update",
action="store_true",
dest="update",
help=(
"Merge model changes into the latest migration and optimize the "
"resulting operations."
),
)
@property @property
def log_output(self): def log_output(self):
@ -101,6 +112,7 @@ class Command(BaseCommand):
self.include_header = options["include_header"] self.include_header = options["include_header"]
check_changes = options["check_changes"] check_changes = options["check_changes"]
self.scriptable = options["scriptable"] self.scriptable = options["scriptable"]
self.update = options["update"]
# If logs and prompts are diverted to stderr, remove the ERROR style. # If logs and prompts are diverted to stderr, remove the ERROR style.
if self.scriptable: if self.scriptable:
self.stderr.style_func = None self.stderr.style_func = None
@ -235,12 +247,87 @@ class Command(BaseCommand):
) )
else: else:
self.log("No changes detected") self.log("No changes detected")
else:
if self.update:
self.write_to_last_migration_files(changes)
else: else:
self.write_migration_files(changes) self.write_migration_files(changes)
if check_changes: if check_changes:
sys.exit(1) sys.exit(1)
def write_migration_files(self, changes): def write_to_last_migration_files(self, changes):
loader = MigrationLoader(connections[DEFAULT_DB_ALIAS])
new_changes = {}
update_previous_migration_paths = {}
for app_label, app_migrations in changes.items():
# Find last migration.
leaf_migration_nodes = loader.graph.leaf_nodes(app=app_label)
if len(leaf_migration_nodes) == 0:
raise CommandError(
f"App {app_label} has no migration, cannot update last migration."
)
leaf_migration_node = leaf_migration_nodes[0]
# Multiple leaf nodes have already been checked earlier in command.
leaf_migration = loader.graph.nodes[leaf_migration_node]
# Updated migration cannot be a squash migration, a dependency of
# another migration, and cannot be already applied.
if leaf_migration.replaces:
raise CommandError(
f"Cannot update squash migration '{leaf_migration}'."
)
if leaf_migration_node in loader.applied_migrations:
raise CommandError(
f"Cannot update applied migration '{leaf_migration}'."
)
depending_migrations = [
migration
for migration in loader.disk_migrations.values()
if leaf_migration_node in migration.dependencies
]
if depending_migrations:
formatted_migrations = ", ".join(
[f"'{migration}'" for migration in depending_migrations]
)
raise CommandError(
f"Cannot update migration '{leaf_migration}' that migrations "
f"{formatted_migrations} depend on."
)
# Build new migration.
for migration in app_migrations:
leaf_migration.operations.extend(migration.operations)
for dependency in migration.dependencies:
if isinstance(dependency, SwappableTuple):
if settings.AUTH_USER_MODEL == dependency.setting:
leaf_migration.dependencies.append(
("__setting__", "AUTH_USER_MODEL")
)
else:
leaf_migration.dependencies.append(dependency)
elif dependency[0] != migration.app_label:
leaf_migration.dependencies.append(dependency)
# Optimize migration.
optimizer = MigrationOptimizer()
leaf_migration.operations = optimizer.optimize(
leaf_migration.operations, app_label
)
# Update name.
previous_migration_path = MigrationWriter(leaf_migration).path
suggested_name = (
leaf_migration.name[:4] + "_" + leaf_migration.suggest_name()
)
if leaf_migration.name == suggested_name:
new_name = leaf_migration.name + "_updated"
else:
new_name = suggested_name
leaf_migration.name = new_name
# Register overridden migration.
new_changes[app_label] = [leaf_migration]
update_previous_migration_paths[app_label] = previous_migration_path
self.write_migration_files(new_changes, update_previous_migration_paths)
def write_migration_files(self, changes, update_previous_migration_paths=None):
""" """
Take a changes dict and write them out as migration files. Take a changes dict and write them out as migration files.
""" """
@ -274,6 +361,22 @@ class Command(BaseCommand):
with open(writer.path, "w", encoding="utf-8") as fh: with open(writer.path, "w", encoding="utf-8") as fh:
fh.write(migration_string) fh.write(migration_string)
self.written_files.append(writer.path) self.written_files.append(writer.path)
if update_previous_migration_paths:
prev_path = update_previous_migration_paths[app_label]
rel_prev_path = self.get_relative_path(prev_path)
if writer.needs_manual_porting:
migration_path = self.get_relative_path(writer.path)
self.log(
self.style.WARNING(
f"Updated migration {migration_path} requires "
f"manual porting.\n"
f"Previous migration {rel_prev_path} was kept and "
f"must be deleted after porting functions manually."
)
)
else:
os.remove(prev_path)
self.log(f"Deleted {rel_prev_path}")
elif self.verbosity == 3: elif self.verbosity == 3:
# Alternatively, makemigrations --dry-run --verbosity 3 # Alternatively, makemigrations --dry-run --verbosity 3
# will log the migrations rather than saving the file to # will log the migrations rather than saving the file to

View File

@ -832,6 +832,13 @@ migrations are detected.
Diverts log output and input prompts to ``stderr``, writing only paths of Diverts log output and input prompts to ``stderr``, writing only paths of
generated migration files to ``stdout``. generated migration files to ``stdout``.
.. django-admin-option:: --update
.. versionadded:: 4.2
Merges model changes into the latest migration and optimize the resulting
operations.
``migrate`` ``migrate``
----------- -----------

View File

@ -158,6 +158,9 @@ Management Commands
* :djadmin:`makemessages` command now supports locales with private sub-tags * :djadmin:`makemessages` command now supports locales with private sub-tags
such as ``nl_NL-x-informal``. such as ``nl_NL-x-informal``.
* The new :option:`makemigrations --update` option merges model changes into
the latest migration and optimizes the resulting operations.
Migrations Migrations
~~~~~~~~~~ ~~~~~~~~~~

View File

@ -2584,6 +2584,108 @@ class MakeMigrationsTests(MigrationTestBase):
out_value = out.getvalue() out_value = out.getvalue()
self.assertIn("0003_auto", out_value) self.assertIn("0003_auto", out_value)
def test_makemigrations_update(self):
with self.temporary_migration_module(
module="migrations.test_migrations"
) as migration_dir:
migration_file = os.path.join(migration_dir, "0002_second.py")
with open(migration_file) as fp:
initial_content = fp.read()
with captured_stdout() as out:
call_command("makemigrations", "migrations", update=True)
self.assertFalse(
any(
filename.startswith("0003")
for filename in os.listdir(migration_dir)
)
)
self.assertIs(os.path.exists(migration_file), False)
new_migration_file = os.path.join(
migration_dir,
"0002_delete_tribble_author_rating_modelwithcustombase_and_more.py",
)
with open(new_migration_file) as fp:
self.assertNotEqual(initial_content, fp.read())
self.assertIn(f"Deleted {migration_file}", out.getvalue())
def test_makemigrations_update_existing_name(self):
with self.temporary_migration_module(
module="migrations.test_auto_now_add"
) as migration_dir:
migration_file = os.path.join(migration_dir, "0001_initial.py")
with open(migration_file) as fp:
initial_content = fp.read()
with captured_stdout() as out:
call_command("makemigrations", "migrations", update=True)
self.assertIs(os.path.exists(migration_file), False)
new_migration_file = os.path.join(
migration_dir,
"0001_initial_updated.py",
)
with open(new_migration_file) as fp:
self.assertNotEqual(initial_content, fp.read())
self.assertIn(f"Deleted {migration_file}", out.getvalue())
def test_makemigrations_update_applied_migration(self):
recorder = MigrationRecorder(connection)
recorder.record_applied("migrations", "0001_initial")
recorder.record_applied("migrations", "0002_second")
with self.temporary_migration_module(module="migrations.test_migrations"):
msg = "Cannot update applied migration 'migrations.0002_second'."
with self.assertRaisesMessage(CommandError, msg):
call_command("makemigrations", "migrations", update=True)
def test_makemigrations_update_no_migration(self):
with self.temporary_migration_module(module="migrations.test_migrations_empty"):
msg = "App migrations has no migration, cannot update last migration."
with self.assertRaisesMessage(CommandError, msg):
call_command("makemigrations", "migrations", update=True)
def test_makemigrations_update_squash_migration(self):
with self.temporary_migration_module(
module="migrations.test_migrations_squashed"
):
msg = "Cannot update squash migration 'migrations.0001_squashed_0002'."
with self.assertRaisesMessage(CommandError, msg):
call_command("makemigrations", "migrations", update=True)
def test_makemigrations_update_manual_porting(self):
with self.temporary_migration_module(
module="migrations.test_migrations_plan"
) as migration_dir:
with captured_stdout() as out:
call_command("makemigrations", "migrations", update=True)
# Previous migration exists.
previous_migration_file = os.path.join(migration_dir, "0005_fifth.py")
self.assertIs(os.path.exists(previous_migration_file), True)
# New updated migration exists.
files = [f for f in os.listdir(migration_dir) if f.startswith("0005_auto")]
updated_migration_file = os.path.join(migration_dir, files[0])
self.assertIs(os.path.exists(updated_migration_file), True)
self.assertIn(
f"Updated migration {updated_migration_file} requires manual porting.\n"
f"Previous migration {previous_migration_file} was kept and must be "
f"deleted after porting functions manually.",
out.getvalue(),
)
@override_settings(
INSTALLED_APPS=[
"migrations.migrations_test_apps.alter_fk.author_app",
"migrations.migrations_test_apps.alter_fk.book_app",
]
)
def test_makemigrations_update_dependency_migration(self):
with self.temporary_migration_module(app_label="book_app"):
msg = (
"Cannot update migration 'book_app.0001_initial' that migrations "
"'author_app.0002_alter_id' depend on."
)
with self.assertRaisesMessage(CommandError, msg):
call_command("makemigrations", "book_app", update=True)
class SquashMigrationsTests(MigrationTestBase): class SquashMigrationsTests(MigrationTestBase):
""" """