Fixed #24067 -- Renamed content types upon model renaming.

Thanks to Tim for the extensive review.
This commit is contained in:
Simon Charette 2015-12-17 20:12:05 -05:00
parent 354acd04af
commit f179113e6c
No known key found for this signature in database
GPG Key ID: 72AF89A0B1B4EDB3
5 changed files with 198 additions and 11 deletions

View File

@ -1,10 +1,12 @@
from django.apps import AppConfig from django.apps import AppConfig
from django.contrib.contenttypes.checks import check_generic_foreign_keys from django.contrib.contenttypes.checks import check_generic_foreign_keys
from django.core import checks from django.core import checks
from django.db.models.signals import post_migrate from django.db.models.signals import post_migrate, pre_migrate
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from .management import update_contenttypes from .management import (
inject_rename_contenttypes_operations, update_contenttypes,
)
class ContentTypesConfig(AppConfig): class ContentTypesConfig(AppConfig):
@ -12,5 +14,6 @@ class ContentTypesConfig(AppConfig):
verbose_name = _("Content Types") verbose_name = _("Content Types")
def ready(self): def ready(self):
pre_migrate.connect(inject_rename_contenttypes_operations, sender=self)
post_migrate.connect(update_contenttypes) post_migrate.connect(update_contenttypes)
checks.register(check_generic_foreign_keys, checks.Tags.models) checks.register(check_generic_foreign_keys, checks.Tags.models)

View File

@ -1,9 +1,91 @@
from django.apps import apps as global_apps from django.apps import apps as global_apps
from django.db import DEFAULT_DB_ALIAS, router from django.db import DEFAULT_DB_ALIAS, migrations, router, transaction
from django.db.utils import IntegrityError
from django.utils import six from django.utils import six
from django.utils.six.moves import input from django.utils.six.moves import input
class RenameContentType(migrations.RunPython):
def __init__(self, app_label, old_model, new_model):
self.app_label = app_label
self.old_model = old_model
self.new_model = new_model
super(RenameContentType, self).__init__(self.rename_forward, self.rename_backward)
def _rename(self, apps, schema_editor, old_model, new_model):
ContentType = apps.get_model('contenttypes', 'ContentType')
db = schema_editor.connection.alias
if not router.allow_migrate_model(db, ContentType):
return
try:
content_type = ContentType.objects.db_manager(db).get_by_natural_key(self.app_label, old_model)
except ContentType.DoesNotExist:
pass
else:
content_type.model = new_model
try:
with transaction.atomic(using=db):
content_type.save(update_fields={'model'})
except IntegrityError:
# Gracefully fallback if a stale content type causes a
# conflict as update_contenttypes will take care of asking the
# user what should be done next.
content_type.model = old_model
else:
# Clear the cache as the `get_by_natual_key()` call will cache
# the renamed ContentType instance by its old model name.
ContentType.objects.clear_cache()
def rename_forward(self, apps, schema_editor):
self._rename(apps, schema_editor, self.old_model, self.new_model)
def rename_backward(self, apps, schema_editor):
self._rename(apps, schema_editor, self.new_model, self.old_model)
def inject_rename_contenttypes_operations(plan=None, apps=global_apps, using=DEFAULT_DB_ALIAS, **kwargs):
"""
Insert a `RenameContentType` operation after every planned `RenameModel`
operation.
"""
if plan is None:
return
# Determine whether or not the ContentType model is available.
try:
ContentType = apps.get_model('contenttypes', 'ContentType')
except LookupError:
available = False
else:
if not router.allow_migrate_model(using, ContentType):
return
available = True
for migration, backward in plan:
if ((migration.app_label, migration.name) == ('contenttypes', '0001_initial')):
# There's no point in going forward if the initial contenttypes
# migration is unapplied as the ContentType model will be
# unavailable from this point.
if backward:
break
else:
available = True
continue
# The ContentType model is not available yet.
if not available:
continue
inserts = []
for index, operation in enumerate(migration.operations):
if isinstance(operation, migrations.RenameModel):
operation = RenameContentType(
migration.app_label, operation.old_name_lower, operation.new_name_lower
)
inserts.append((index + 1, operation))
for inserted, (index, operation) in enumerate(inserts):
migration.operations.insert(inserted + index, operation)
def update_contenttypes(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs): def update_contenttypes(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs):
""" """
Creates content types for models in the given app, removing any model Creates content types for models in the given app, removing any model

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
def assert_foo_contenttype_not_cached(apps, schema_editor):
ContentType = apps.get_model('contenttypes', 'ContentType')
try:
content_type = ContentType.objects.get_by_natural_key('contenttypes_tests', 'foo')
except ContentType.DoesNotExist:
pass
else:
if not ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists():
raise AssertionError('The contenttypes_tests.Foo ContentType should not be cached.')
elif content_type.model != 'foo':
raise AssertionError(
"The cached contenttypes_tests.Foo ContentType should have "
"its model set to 'foo'."
)
class Migration(migrations.Migration):
operations = [
migrations.CreateModel(
'Foo',
[
('id', models.AutoField(primary_key=True)),
],
),
migrations.RenameModel('Foo', 'RenamedFoo'),
migrations.RunPython(assert_foo_contenttype_not_cached, migrations.RunPython.noop)
]

View File

@ -4,15 +4,18 @@ from __future__ import unicode_literals
import datetime import datetime
from django.apps.registry import Apps, apps from django.apps.registry import Apps, apps
from django.contrib.contenttypes import management from django.conf import settings
from django.contrib.contenttypes import management as contenttypes_management
from django.contrib.contenttypes.fields import ( from django.contrib.contenttypes.fields import (
GenericForeignKey, GenericRelation, GenericForeignKey, GenericRelation,
) )
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.contrib.sites.models import Site from django.contrib.sites.models import Site
from django.core import checks from django.core import checks, management
from django.db import connections, models from django.db import connections, migrations, models
from django.test import SimpleTestCase, TestCase, mock, override_settings from django.test import (
SimpleTestCase, TestCase, TransactionTestCase, mock, override_settings,
)
from django.test.utils import captured_stdout, isolate_apps from django.test.utils import captured_stdout, isolate_apps
from django.utils.encoding import force_str, force_text from django.utils.encoding import force_str, force_text
@ -388,9 +391,9 @@ class UpdateContentTypesTests(TestCase):
interactive mode of update_contenttypes() (the default) should delete interactive mode of update_contenttypes() (the default) should delete
stale contenttypes. stale contenttypes.
""" """
management.input = lambda x: force_str("yes") contenttypes_management.input = lambda x: force_str("yes")
with captured_stdout() as stdout: with captured_stdout() as stdout:
management.update_contenttypes(self.app_config) contenttypes_management.update_contenttypes(self.app_config)
self.assertIn("Deleting stale content type", stdout.getvalue()) self.assertIn("Deleting stale content type", stdout.getvalue())
self.assertEqual(ContentType.objects.count(), self.before_count) self.assertEqual(ContentType.objects.count(), self.before_count)
@ -400,7 +403,7 @@ class UpdateContentTypesTests(TestCase):
content types. content types.
""" """
with captured_stdout() as stdout: with captured_stdout() as stdout:
management.update_contenttypes(self.app_config, interactive=False) contenttypes_management.update_contenttypes(self.app_config, interactive=False)
self.assertIn("Stale content types remain.", stdout.getvalue()) self.assertIn("Stale content types remain.", stdout.getvalue())
self.assertEqual(ContentType.objects.count(), self.before_count + 1) self.assertEqual(ContentType.objects.count(), self.before_count + 1)
@ -411,7 +414,7 @@ class UpdateContentTypesTests(TestCase):
""" """
apps = Apps() apps = Apps()
with self.assertNumQueries(0): with self.assertNumQueries(0):
management.update_contenttypes(self.app_config, interactive=False, verbosity=0, apps=apps) contenttypes_management.update_contenttypes(self.app_config, interactive=False, verbosity=0, apps=apps)
self.assertEqual(ContentType.objects.count(), self.before_count + 1) self.assertEqual(ContentType.objects.count(), self.before_count + 1)
@ -445,3 +448,68 @@ class ContentTypesMultidbTestCase(TestCase):
with self.assertNumQueries(0, using='default'), \ with self.assertNumQueries(0, using='default'), \
self.assertNumQueries(1, using='other'): self.assertNumQueries(1, using='other'):
ContentType.objects.get_for_model(Author) ContentType.objects.get_for_model(Author)
@override_settings(
MIGRATION_MODULES=dict(settings.MIGRATION_MODULES, contenttypes_tests='contenttypes_tests.operations_migrations'),
)
class ContentTypeOperationsTests(TransactionTestCase):
available_apps = ['django.contrib.contenttypes', 'contenttypes_tests']
def setUp(self):
app_config = apps.get_app_config('contenttypes_tests')
models.signals.post_migrate.connect(self.assertOperationsInjected, sender=app_config)
def tearDown(self):
app_config = apps.get_app_config('contenttypes_tests')
models.signals.post_migrate.disconnect(self.assertOperationsInjected, sender=app_config)
def assertOperationsInjected(self, plan, **kwargs):
for migration, _backward in plan:
operations = iter(migration.operations)
for operation in operations:
if isinstance(operation, migrations.RenameModel):
next_operation = next(operations)
self.assertIsInstance(next_operation, contenttypes_management.RenameContentType)
self.assertEqual(next_operation.app_label, migration.app_label)
self.assertEqual(next_operation.old_model, operation.old_name_lower)
self.assertEqual(next_operation.new_model, operation.new_name_lower)
def test_existing_content_type_rename(self):
ContentType.objects.create(app_label='contenttypes_tests', model='foo')
management.call_command(
'migrate', 'contenttypes_tests', database='default', interactive=False, verbosity=0,
)
self.assertFalse(ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists())
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='renamedfoo').exists())
management.call_command(
'migrate', 'contenttypes_tests', 'zero', database='default', interactive=False, verbosity=0,
)
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists())
self.assertFalse(ContentType.objects.filter(app_label='contenttypes_tests', model='renamedfoo').exists())
def test_missing_content_type_rename_ignore(self):
management.call_command(
'migrate', 'contenttypes_tests', database='default', interactive=False, verbosity=0,
)
self.assertFalse(ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists())
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='renamedfoo').exists())
management.call_command(
'migrate', 'contenttypes_tests', 'zero', database='default', interactive=False, verbosity=0,
)
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists())
self.assertFalse(ContentType.objects.filter(app_label='contenttypes_tests', model='renamedfoo').exists())
def test_content_type_rename_conflict(self):
ContentType.objects.create(app_label='contenttypes_tests', model='foo')
ContentType.objects.create(app_label='contenttypes_tests', model='renamedfoo')
management.call_command(
'migrate', 'contenttypes_tests', database='default', interactive=False, verbosity=0,
)
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists())
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='renamedfoo').exists())
management.call_command(
'migrate', 'contenttypes_tests', 'zero', database='default', interactive=False, verbosity=0,
)
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='foo').exists())
self.assertTrue(ContentType.objects.filter(app_label='contenttypes_tests', model='renamedfoo').exists())