Fixed #29899 -- Made autodetector use model states instead of model classes.

Thanks Simon Charette and Markus Holtermann for reviews.
This commit is contained in:
David Wobrock 2020-04-25 17:20:26 +02:00 committed by Mariusz Felisiak
parent a67849499a
commit aa4acc164d
3 changed files with 249 additions and 148 deletions

View File

@ -9,7 +9,9 @@ from django.db.migrations.migration import Migration
from django.db.migrations.operations.models import AlterModelOptions from django.db.migrations.operations.models import AlterModelOptions
from django.db.migrations.optimizer import MigrationOptimizer from django.db.migrations.optimizer import MigrationOptimizer
from django.db.migrations.questioner import MigrationQuestioner from django.db.migrations.questioner import MigrationQuestioner
from django.db.migrations.utils import COMPILED_REGEX_TYPE, RegexObject from django.db.migrations.utils import (
COMPILED_REGEX_TYPE, RegexObject, resolve_relation,
)
from django.utils.topological_sort import stable_topological_sort from django.utils.topological_sort import stable_topological_sort
@ -123,37 +125,36 @@ class MigrationAutodetector:
# Prepare some old/new state and model lists, separating # Prepare some old/new state and model lists, separating
# proxy models and ignoring unmigrated apps. # proxy models and ignoring unmigrated apps.
self.old_apps = self.from_state.concrete_apps
self.new_apps = self.to_state.apps
self.old_model_keys = set() self.old_model_keys = set()
self.old_proxy_keys = set() self.old_proxy_keys = set()
self.old_unmanaged_keys = set() self.old_unmanaged_keys = set()
self.new_model_keys = set() self.new_model_keys = set()
self.new_proxy_keys = set() self.new_proxy_keys = set()
self.new_unmanaged_keys = set() self.new_unmanaged_keys = set()
for app_label, model_name in self.from_state.models: for (app_label, model_name), model_state in self.from_state.models.items():
model = self.old_apps.get_model(app_label, model_name) if not model_state.options.get('managed', True):
if not model._meta.managed:
self.old_unmanaged_keys.add((app_label, model_name)) self.old_unmanaged_keys.add((app_label, model_name))
elif app_label not in self.from_state.real_apps: elif app_label not in self.from_state.real_apps:
if model._meta.proxy: if model_state.options.get('proxy'):
self.old_proxy_keys.add((app_label, model_name)) self.old_proxy_keys.add((app_label, model_name))
else: else:
self.old_model_keys.add((app_label, model_name)) self.old_model_keys.add((app_label, model_name))
for app_label, model_name in self.to_state.models: for (app_label, model_name), model_state in self.to_state.models.items():
model = self.new_apps.get_model(app_label, model_name) if not model_state.options.get('managed', True):
if not model._meta.managed:
self.new_unmanaged_keys.add((app_label, model_name)) self.new_unmanaged_keys.add((app_label, model_name))
elif ( elif (
app_label not in self.from_state.real_apps or app_label not in self.from_state.real_apps or
(convert_apps and app_label in convert_apps) (convert_apps and app_label in convert_apps)
): ):
if model._meta.proxy: if model_state.options.get('proxy'):
self.new_proxy_keys.add((app_label, model_name)) self.new_proxy_keys.add((app_label, model_name))
else: else:
self.new_model_keys.add((app_label, model_name)) self.new_model_keys.add((app_label, model_name))
self.from_state.resolve_fields_and_relations()
self.to_state.resolve_fields_and_relations()
# Renames have to come first # Renames have to come first
self.generate_renamed_models() self.generate_renamed_models()
@ -224,14 +225,9 @@ class MigrationAutodetector:
for app_label, model_name in sorted(self.old_model_keys): for app_label, model_name in sorted(self.old_model_keys):
old_model_name = self.renamed_models.get((app_label, model_name), model_name) old_model_name = self.renamed_models.get((app_label, model_name), model_name)
old_model_state = self.from_state.models[app_label, old_model_name] old_model_state = self.from_state.models[app_label, old_model_name]
for field_name in old_model_state.fields: for field_name, field in old_model_state.fields.items():
old_field = self.old_apps.get_model(app_label, old_model_name)._meta.get_field(field_name) if hasattr(field, 'remote_field') and getattr(field.remote_field, 'through', None):
if (hasattr(old_field, "remote_field") and getattr(old_field.remote_field, "through", None) and through_key = resolve_relation(field.remote_field.through, app_label, model_name)
not old_field.remote_field.through._meta.auto_created):
through_key = (
old_field.remote_field.through._meta.app_label,
old_field.remote_field.through._meta.model_name,
)
self.through_users[through_key] = (app_label, old_model_name, field_name) self.through_users[through_key] = (app_label, old_model_name, field_name)
@staticmethod @staticmethod
@ -446,11 +442,14 @@ class MigrationAutodetector:
real way to solve #22783). real way to solve #22783).
""" """
try: try:
model = self.new_apps.get_model(item[0], item[1]) model_state = self.to_state.models[item]
base_names = [base.__name__ for base in model.__bases__] base_names = {
base if isinstance(base, str) else base.__name__
for base in model_state.bases
}
string_version = "%s.%s" % (item[0], item[1]) string_version = "%s.%s" % (item[0], item[1])
if ( if (
model._meta.swappable or model_state.options.get('swappable') or
"AbstractUser" in base_names or "AbstractUser" in base_names or
"AbstractBaseUser" in base_names or "AbstractBaseUser" in base_names or
settings.AUTH_USER_MODEL.lower() == string_version.lower() settings.AUTH_USER_MODEL.lower() == string_version.lower()
@ -480,11 +479,19 @@ class MigrationAutodetector:
rem_model_fields_def = self.only_relation_agnostic_fields(rem_model_state.fields) rem_model_fields_def = self.only_relation_agnostic_fields(rem_model_state.fields)
if model_fields_def == rem_model_fields_def: if model_fields_def == rem_model_fields_def:
if self.questioner.ask_rename_model(rem_model_state, model_state): if self.questioner.ask_rename_model(rem_model_state, model_state):
model_opts = self.new_apps.get_model(app_label, model_name)._meta
dependencies = [] dependencies = []
for field in model_opts.get_fields(): fields = list(model_state.fields.values()) + [
field.remote_field
for relations in self.to_state.relations[app_label, model_name].values()
for _, field in relations
]
for field in fields:
if field.is_relation: if field.is_relation:
dependencies.extend(self._get_dependencies_for_foreign_key(field)) dependencies.extend(
self._get_dependencies_for_foreign_key(
app_label, model_name, field, self.to_state,
)
)
self.add_operation( self.add_operation(
app_label, app_label,
operations.RenameModel( operations.RenameModel(
@ -525,27 +532,19 @@ class MigrationAutodetector:
) )
for app_label, model_name in all_added_models: for app_label, model_name in all_added_models:
model_state = self.to_state.models[app_label, model_name] model_state = self.to_state.models[app_label, model_name]
model_opts = self.new_apps.get_model(app_label, model_name)._meta
# Gather related fields # Gather related fields
related_fields = {} related_fields = {}
primary_key_rel = None primary_key_rel = None
for field in model_opts.local_fields: for field_name, field in model_state.fields.items():
if field.remote_field: if field.remote_field:
if field.remote_field.model: if field.remote_field.model:
if field.primary_key: if field.primary_key:
primary_key_rel = field.remote_field.model primary_key_rel = field.remote_field.model
elif not field.remote_field.parent_link: elif not field.remote_field.parent_link:
related_fields[field.name] = field related_fields[field_name] = field
# through will be none on M2Ms on swapped-out models; if getattr(field.remote_field, 'through', None):
# we can treat lack of through as auto_created=True, though. related_fields[field_name] = field
if (getattr(field.remote_field, "through", None) and
not field.remote_field.through._meta.auto_created):
related_fields[field.name] = field
for field in model_opts.local_many_to_many:
if field.remote_field.model:
related_fields[field.name] = field
if getattr(field.remote_field, "through", None) and not field.remote_field.through._meta.auto_created:
related_fields[field.name] = field
# Are there indexes/unique|index_together to defer? # Are there indexes/unique|index_together to defer?
indexes = model_state.options.pop('indexes') indexes = model_state.options.pop('indexes')
constraints = model_state.options.pop('constraints') constraints = model_state.options.pop('constraints')
@ -573,12 +572,11 @@ class MigrationAutodetector:
dependencies.append((base_app_label, base_name, removed_base_field, False)) dependencies.append((base_app_label, base_name, removed_base_field, False))
# Depend on the other end of the primary key if it's a relation # Depend on the other end of the primary key if it's a relation
if primary_key_rel: if primary_key_rel:
dependencies.append(( dependencies.append(
primary_key_rel._meta.app_label, resolve_relation(
primary_key_rel._meta.object_name, primary_key_rel, app_label, model_name,
None, ) + (None, True)
True )
))
# Generate creation operation # Generate creation operation
self.add_operation( self.add_operation(
app_label, app_label,
@ -594,12 +592,14 @@ class MigrationAutodetector:
) )
# Don't add operations which modify the database for unmanaged models # Don't add operations which modify the database for unmanaged models
if not model_opts.managed: if not model_state.options.get('managed', True):
continue continue
# Generate operations for each related field # Generate operations for each related field
for name, field in sorted(related_fields.items()): for name, field in sorted(related_fields.items()):
dependencies = self._get_dependencies_for_foreign_key(field) dependencies = self._get_dependencies_for_foreign_key(
app_label, model_name, field, self.to_state,
)
# Depend on our own model being created # Depend on our own model being created
dependencies.append((app_label, model_name, None, True)) dependencies.append((app_label, model_name, None, True))
# Make operation # Make operation
@ -668,17 +668,20 @@ class MigrationAutodetector:
) )
# Fix relationships if the model changed from a proxy model to a # Fix relationships if the model changed from a proxy model to a
# concrete model. # concrete model.
relations = self.to_state.relations
if (app_label, model_name) in self.old_proxy_keys: if (app_label, model_name) in self.old_proxy_keys:
for related_object in model_opts.related_objects: for related_model_key, related_fields in relations[app_label, model_name].items():
self.add_operation( related_model_state = self.to_state.models[related_model_key]
related_object.related_model._meta.app_label, for related_field_name, related_field in related_fields:
operations.AlterField( self.add_operation(
model_name=related_object.related_model._meta.object_name, related_model_state.app_label,
name=related_object.field.name, operations.AlterField(
field=related_object.field, model_name=related_model_state.name,
), name=related_field_name,
dependencies=[(app_label, model_name, None, True)], field=related_field,
) ),
dependencies=[(app_label, model_name, None, True)],
)
def generate_created_proxies(self): def generate_created_proxies(self):
""" """
@ -729,23 +732,14 @@ class MigrationAutodetector:
all_deleted_models = chain(sorted(deleted_models), sorted(deleted_unmanaged_models)) all_deleted_models = chain(sorted(deleted_models), sorted(deleted_unmanaged_models))
for app_label, model_name in all_deleted_models: for app_label, model_name in all_deleted_models:
model_state = self.from_state.models[app_label, model_name] model_state = self.from_state.models[app_label, model_name]
model = self.old_apps.get_model(app_label, model_name)
# Gather related fields # Gather related fields
related_fields = {} related_fields = {}
for field in model._meta.local_fields: for field_name, field in model_state.fields.items():
if field.remote_field: if field.remote_field:
if field.remote_field.model: if field.remote_field.model:
related_fields[field.name] = field related_fields[field_name] = field
# through will be none on M2Ms on swapped-out models; if getattr(field.remote_field, 'through', None):
# we can treat lack of through as auto_created=True, though. related_fields[field_name] = field
if (getattr(field.remote_field, "through", None) and
not field.remote_field.through._meta.auto_created):
related_fields[field.name] = field
for field in model._meta.local_many_to_many:
if field.remote_field.model:
related_fields[field.name] = field
if getattr(field.remote_field, "through", None) and not field.remote_field.through._meta.auto_created:
related_fields[field.name] = field
# Generate option removal first # Generate option removal first
unique_together = model_state.options.pop('unique_together', None) unique_together = model_state.options.pop('unique_together', None)
index_together = model_state.options.pop('index_together', None) index_together = model_state.options.pop('index_together', None)
@ -779,13 +773,18 @@ class MigrationAutodetector:
# and the removal of all its own related fields, and if it's # and the removal of all its own related fields, and if it's
# a through model the field that references it. # a through model the field that references it.
dependencies = [] dependencies = []
for related_object in model._meta.related_objects: relations = self.from_state.relations
related_object_app_label = related_object.related_model._meta.app_label for (related_object_app_label, object_name), relation_related_fields in (
object_name = related_object.related_model._meta.object_name relations[app_label, model_name].items()
field_name = related_object.field.name ):
dependencies.append((related_object_app_label, object_name, field_name, False)) for field_name, field in relation_related_fields:
if not related_object.many_to_many: dependencies.append(
dependencies.append((related_object_app_label, object_name, field_name, "alter")) (related_object_app_label, object_name, field_name, False),
)
if not field.many_to_many:
dependencies.append(
(related_object_app_label, object_name, field_name, 'alter'),
)
for name in sorted(related_fields): for name in sorted(related_fields):
dependencies.append((app_label, model_name, name, False)) dependencies.append((app_label, model_name, name, False))
@ -821,12 +820,13 @@ class MigrationAutodetector:
for app_label, model_name, field_name in sorted(self.new_field_keys - self.old_field_keys): for app_label, model_name, field_name in sorted(self.new_field_keys - self.old_field_keys):
old_model_name = self.renamed_models.get((app_label, model_name), model_name) old_model_name = self.renamed_models.get((app_label, model_name), model_name)
old_model_state = self.from_state.models[app_label, old_model_name] old_model_state = self.from_state.models[app_label, old_model_name]
field = self.new_apps.get_model(app_label, model_name)._meta.get_field(field_name) new_model_state = self.to_state.models[app_label, old_model_name]
field = new_model_state.get_field(field_name)
# Scan to see if this is actually a rename! # Scan to see if this is actually a rename!
field_dec = self.deep_deconstruct(field) field_dec = self.deep_deconstruct(field)
for rem_app_label, rem_model_name, rem_field_name in sorted(self.old_field_keys - self.new_field_keys): for rem_app_label, rem_model_name, rem_field_name in sorted(self.old_field_keys - self.new_field_keys):
if rem_app_label == app_label and rem_model_name == model_name: if rem_app_label == app_label and rem_model_name == model_name:
old_field = old_model_state.fields[rem_field_name] old_field = old_model_state.get_field(rem_field_name)
old_field_dec = self.deep_deconstruct(old_field) old_field_dec = self.deep_deconstruct(old_field)
if field.remote_field and field.remote_field.model and 'to' in old_field_dec[2]: if field.remote_field and field.remote_field.model and 'to' in old_field_dec[2]:
old_rel_to = old_field_dec[2]['to'] old_rel_to = old_field_dec[2]['to']
@ -859,11 +859,13 @@ class MigrationAutodetector:
self._generate_added_field(app_label, model_name, field_name) self._generate_added_field(app_label, model_name, field_name)
def _generate_added_field(self, app_label, model_name, field_name): def _generate_added_field(self, app_label, model_name, field_name):
field = self.new_apps.get_model(app_label, model_name)._meta.get_field(field_name) field = self.to_state.models[app_label, model_name].get_field(field_name)
# Fields that are foreignkeys/m2ms depend on stuff # Fields that are foreignkeys/m2ms depend on stuff
dependencies = [] dependencies = []
if field.remote_field and field.remote_field.model: if field.remote_field and field.remote_field.model:
dependencies.extend(self._get_dependencies_for_foreign_key(field)) dependencies.extend(self._get_dependencies_for_foreign_key(
app_label, model_name, field, self.to_state,
))
# You can't just add NOT NULL fields with no default or fields # You can't just add NOT NULL fields with no default or fields
# which don't allow empty strings as default. # which don't allow empty strings as default.
time_fields = (models.DateField, models.DateTimeField, models.TimeField) time_fields = (models.DateField, models.DateTimeField, models.TimeField)
@ -919,16 +921,13 @@ class MigrationAutodetector:
# Did the field change? # Did the field change?
old_model_name = self.renamed_models.get((app_label, model_name), model_name) old_model_name = self.renamed_models.get((app_label, model_name), model_name)
old_field_name = self.renamed_fields.get((app_label, model_name, field_name), field_name) old_field_name = self.renamed_fields.get((app_label, model_name, field_name), field_name)
old_field = self.old_apps.get_model(app_label, old_model_name)._meta.get_field(old_field_name) old_field = self.from_state.models[app_label, old_model_name].get_field(old_field_name)
new_field = self.new_apps.get_model(app_label, model_name)._meta.get_field(field_name) new_field = self.to_state.models[app_label, model_name].get_field(field_name)
dependencies = [] dependencies = []
# Implement any model renames on relations; these are handled by RenameModel # Implement any model renames on relations; these are handled by RenameModel
# so we need to exclude them from the comparison # so we need to exclude them from the comparison
if hasattr(new_field, "remote_field") and getattr(new_field.remote_field, "model", None): if hasattr(new_field, "remote_field") and getattr(new_field.remote_field, "model", None):
rename_key = ( rename_key = resolve_relation(new_field.remote_field.model, app_label, model_name)
new_field.remote_field.model._meta.app_label,
new_field.remote_field.model._meta.model_name,
)
if rename_key in self.renamed_models: if rename_key in self.renamed_models:
new_field.remote_field.model = old_field.remote_field.model new_field.remote_field.model = old_field.remote_field.model
# Handle ForeignKey which can only have a single to_field. # Handle ForeignKey which can only have a single to_field.
@ -953,12 +952,14 @@ class MigrationAutodetector:
self.renamed_fields.get(rename_key + (to_field,), to_field) self.renamed_fields.get(rename_key + (to_field,), to_field)
for to_field in new_field.to_fields for to_field in new_field.to_fields
]) ])
dependencies.extend(self._get_dependencies_for_foreign_key(new_field)) dependencies.extend(self._get_dependencies_for_foreign_key(
if hasattr(new_field, "remote_field") and getattr(new_field.remote_field, "through", None): app_label, model_name, new_field, self.to_state,
rename_key = ( ))
new_field.remote_field.through._meta.app_label, if (
new_field.remote_field.through._meta.model_name, hasattr(new_field, 'remote_field') and
) getattr(new_field.remote_field, 'through', None)
):
rename_key = resolve_relation(new_field.remote_field.through, app_label, model_name)
if rename_key in self.renamed_models: if rename_key in self.renamed_models:
new_field.remote_field.through = old_field.remote_field.through new_field.remote_field.through = old_field.remote_field.through
old_field_dec = self.deep_deconstruct(old_field) old_field_dec = self.deep_deconstruct(old_field)
@ -1073,23 +1074,32 @@ class MigrationAutodetector:
) )
) )
def _get_dependencies_for_foreign_key(self, field): @staticmethod
def _get_dependencies_for_foreign_key(app_label, model_name, field, project_state):
remote_field_model = None
if hasattr(field.remote_field, 'model'):
remote_field_model = field.remote_field.model
else:
relations = project_state.relations[app_label, model_name]
for (remote_app_label, remote_model_name), fields in relations.items():
if any(field == related_field.remote_field for _, related_field in fields):
remote_field_model = f'{remote_app_label}.{remote_model_name}'
break
# Account for FKs to swappable models # Account for FKs to swappable models
swappable_setting = getattr(field, 'swappable_setting', None) swappable_setting = getattr(field, 'swappable_setting', None)
if swappable_setting is not None: if swappable_setting is not None:
dep_app_label = "__setting__" dep_app_label = "__setting__"
dep_object_name = swappable_setting dep_object_name = swappable_setting
else: else:
dep_app_label = field.remote_field.model._meta.app_label dep_app_label, dep_object_name = resolve_relation(
dep_object_name = field.remote_field.model._meta.object_name remote_field_model, app_label, model_name,
)
dependencies = [(dep_app_label, dep_object_name, None, True)] dependencies = [(dep_app_label, dep_object_name, None, True)]
if getattr(field.remote_field, "through", None) and not field.remote_field.through._meta.auto_created: if getattr(field.remote_field, 'through', None):
dependencies.append(( through_app_label, through_object_name = resolve_relation(
field.remote_field.through._meta.app_label, remote_field_model, app_label, model_name,
field.remote_field.through._meta.object_name, )
None, dependencies.append((through_app_label, through_object_name, None, True))
True,
))
return dependencies return dependencies
def _generate_altered_foo_together(self, operation): def _generate_altered_foo_together(self, operation):
@ -1116,9 +1126,11 @@ class MigrationAutodetector:
dependencies = [] dependencies = []
for foo_togethers in new_value: for foo_togethers in new_value:
for field_name in foo_togethers: for field_name in foo_togethers:
field = self.new_apps.get_model(app_label, model_name)._meta.get_field(field_name) field = new_model_state.get_field(field_name)
if field.remote_field and field.remote_field.model: if field.remote_field and field.remote_field.model:
dependencies.extend(self._get_dependencies_for_foreign_key(field)) dependencies.extend(self._get_dependencies_for_foreign_key(
app_label, model_name, field, self.to_state,
))
self.add_operation( self.add_operation(
app_label, app_label,

View File

@ -1,5 +1,7 @@
import copy import copy
from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from functools import partial
from django.apps import AppConfig from django.apps import AppConfig
from django.apps.registry import Apps, apps as global_apps from django.apps.registry import Apps, apps as global_apps
@ -13,6 +15,7 @@ from django.utils.module_loading import import_string
from django.utils.version import get_docs_version from django.utils.version import get_docs_version
from .exceptions import InvalidBasesError from .exceptions import InvalidBasesError
from .utils import resolve_relation
def _get_app_label_and_model_name(model, app_label=''): def _get_app_label_and_model_name(model, app_label=''):
@ -87,6 +90,8 @@ class ProjectState:
# Apps to include from main registry, usually unmigrated ones # Apps to include from main registry, usually unmigrated ones
self.real_apps = real_apps or [] self.real_apps = real_apps or []
self.is_delayed = False self.is_delayed = False
# {remote_model_key: {model_key: [(field_name, field)]}}
self.relations = None
def add_model(self, model_state): def add_model(self, model_state):
app_label, model_name = model_state.app_label, model_state.name_lower app_label, model_name = model_state.app_label, model_state.name_lower
@ -188,6 +193,67 @@ class ProjectState:
# Render all models # Render all models
self.apps.render_multiple(states_to_be_rendered) self.apps.render_multiple(states_to_be_rendered)
def resolve_fields_and_relations(self):
# Resolve fields.
for model_state in self.models.values():
for field_name, field in model_state.fields.items():
field.name = field_name
# Resolve relations.
# {remote_model_key: {model_key: [(field_name, field)]}}
self.relations = defaultdict(partial(defaultdict, list))
concretes, proxies = self._get_concrete_models_mapping_and_proxy_models()
real_apps = set(self.real_apps)
for model_key in concretes:
model_state = self.models[model_key]
for field_name, field in model_state.fields.items():
remote_field = field.remote_field
if not remote_field:
continue
remote_model_key = resolve_relation(remote_field.model, *model_key)
if remote_model_key[0] not in real_apps and remote_model_key in concretes:
remote_model_key = concretes[remote_model_key]
self.relations[remote_model_key][model_key].append((field_name, field))
through = getattr(remote_field, 'through', None)
if not through:
continue
through_model_key = resolve_relation(through, *model_key)
if through_model_key[0] not in real_apps and through_model_key in concretes:
through_model_key = concretes[through_model_key]
self.relations[through_model_key][model_key].append((field_name, field))
for model_key in proxies:
self.relations[model_key] = self.relations[concretes[model_key]]
def get_concrete_model_key(self, model):
concrete_models_mapping, _ = self._get_concrete_models_mapping_and_proxy_models()
model_key = make_model_tuple(model)
return concrete_models_mapping[model_key]
def _get_concrete_models_mapping_and_proxy_models(self):
concrete_models_mapping = {}
proxy_models = {}
# Split models to proxy and concrete models.
for model_key, model_state in self.models.items():
if model_state.options.get('proxy'):
proxy_models[model_key] = model_state
# Find a concrete model for the proxy.
concrete_models_mapping[model_key] = self._find_concrete_model_from_proxy(
proxy_models, model_state,
)
else:
concrete_models_mapping[model_key] = model_key
return concrete_models_mapping, proxy_models
def _find_concrete_model_from_proxy(self, proxy_models, model_state):
for base in model_state.bases:
base_key = make_model_tuple(base)
base_state = proxy_models.get(base_key)
if not base_state:
# Concrete model found, stop looking at bases.
return base_key
return self._find_concrete_model_from_proxy(proxy_models, base_state)
def clone(self): def clone(self):
"""Return an exact copy of this ProjectState.""" """Return an exact copy of this ProjectState."""
new_state = ProjectState( new_state = ProjectState(
@ -207,11 +273,6 @@ class ProjectState:
def apps(self): def apps(self):
return StateApps(self.real_apps, self.models) return StateApps(self.real_apps, self.models)
@property
def concrete_apps(self):
self.apps = StateApps(self.real_apps, self.models, ignore_swappable=True)
return self.apps
@classmethod @classmethod
def from_apps(cls, apps): def from_apps(cls, apps):
"""Take an Apps and return a ProjectState matching it.""" """Take an Apps and return a ProjectState matching it."""
@ -392,6 +453,14 @@ class ModelState:
def name_lower(self): def name_lower(self):
return self.name.lower() return self.name.lower()
def get_field(self, field_name):
field_name = (
self.options['order_with_respect_to']
if field_name == '_order'
else field_name
)
return self.fields[field_name]
@classmethod @classmethod
def from_model(cls, model, exclude_rels=False): def from_model(cls, model, exclude_rels=False):
"""Given a model, return a ModelState representing it.""" """Given a model, return a ModelState representing it."""

View File

@ -584,9 +584,13 @@ class AutodetectorTests(TestCase):
return project_state return project_state
def get_changes(self, before_states, after_states, questioner=None): def get_changes(self, before_states, after_states, questioner=None):
if not isinstance(before_states, ProjectState):
before_states = self.make_project_state(before_states)
if not isinstance(after_states, ProjectState):
after_states = self.make_project_state(after_states)
return MigrationAutodetector( return MigrationAutodetector(
self.make_project_state(before_states), before_states,
self.make_project_state(after_states), after_states,
questioner, questioner,
)._detect_changes() )._detect_changes()
@ -1646,30 +1650,38 @@ class AutodetectorTests(TestCase):
""" """
# First, we test the default pk field name # First, we test the default pk field name
changes = self.get_changes([], [self.author_empty, self.author_proxy_third, self.book_proxy_fk]) changes = self.get_changes([], [self.author_empty, self.author_proxy_third, self.book_proxy_fk])
# The field name the FK on the book model points to # The model the FK is pointing from and to.
self.assertEqual(changes['otherapp'][0].operations[0].fields[2][1].remote_field.field_name, 'id') self.assertEqual(
changes['otherapp'][0].operations[0].fields[2][1].remote_field.model,
'thirdapp.AuthorProxy',
)
# Now, we test the custom pk field name # Now, we test the custom pk field name
changes = self.get_changes([], [self.author_custom_pk, self.author_proxy_third, self.book_proxy_fk]) changes = self.get_changes([], [self.author_custom_pk, self.author_proxy_third, self.book_proxy_fk])
# The field name the FK on the book model points to # The model the FK is pointing from and to.
self.assertEqual(changes['otherapp'][0].operations[0].fields[2][1].remote_field.field_name, 'pk_field') self.assertEqual(
changes['otherapp'][0].operations[0].fields[2][1].remote_field.model,
'thirdapp.AuthorProxy',
)
def test_proxy_to_mti_with_fk_to_proxy(self): def test_proxy_to_mti_with_fk_to_proxy(self):
# First, test the pk table and field name. # First, test the pk table and field name.
changes = self.get_changes( to_state = self.make_project_state(
[],
[self.author_empty, self.author_proxy_third, self.book_proxy_fk], [self.author_empty, self.author_proxy_third, self.book_proxy_fk],
) )
changes = self.get_changes([], to_state)
fk_field = changes['otherapp'][0].operations[0].fields[2][1]
self.assertEqual( self.assertEqual(
changes['otherapp'][0].operations[0].fields[2][1].remote_field.model._meta.db_table, to_state.get_concrete_model_key(fk_field.remote_field.model),
'testapp_author', ('testapp', 'author'),
) )
self.assertEqual(changes['otherapp'][0].operations[0].fields[2][1].remote_field.field_name, 'id') self.assertEqual(fk_field.remote_field.model, 'thirdapp.AuthorProxy')
# Change AuthorProxy to use MTI. # Change AuthorProxy to use MTI.
changes = self.get_changes( from_state = to_state.clone()
[self.author_empty, self.author_proxy_third, self.book_proxy_fk], to_state = self.make_project_state(
[self.author_empty, self.author_proxy_third_notproxy, self.book_proxy_fk], [self.author_empty, self.author_proxy_third_notproxy, self.book_proxy_fk],
) )
changes = self.get_changes(from_state, to_state)
# Right number/type of migrations for the AuthorProxy model? # Right number/type of migrations for the AuthorProxy model?
self.assertNumberMigrations(changes, 'thirdapp', 1) self.assertNumberMigrations(changes, 'thirdapp', 1)
self.assertOperationTypes(changes, 'thirdapp', 0, ['DeleteModel', 'CreateModel']) self.assertOperationTypes(changes, 'thirdapp', 0, ['DeleteModel', 'CreateModel'])
@ -1680,30 +1692,39 @@ class AutodetectorTests(TestCase):
# otherapp should depend on thirdapp. # otherapp should depend on thirdapp.
self.assertMigrationDependencies(changes, 'otherapp', 0, [('thirdapp', 'auto_1')]) self.assertMigrationDependencies(changes, 'otherapp', 0, [('thirdapp', 'auto_1')])
# Now, test the pk table and field name. # Now, test the pk table and field name.
fk_field = changes['otherapp'][0].operations[0].field
self.assertEqual( self.assertEqual(
changes['otherapp'][0].operations[0].field.remote_field.model._meta.db_table, to_state.get_concrete_model_key(fk_field.remote_field.model),
'thirdapp_authorproxy', ('thirdapp', 'authorproxy'),
) )
self.assertEqual(changes['otherapp'][0].operations[0].field.remote_field.field_name, 'author_ptr') self.assertEqual(fk_field.remote_field.model, 'thirdapp.AuthorProxy')
def test_proxy_to_mti_with_fk_to_proxy_proxy(self): def test_proxy_to_mti_with_fk_to_proxy_proxy(self):
# First, test the pk table and field name. # First, test the pk table and field name.
changes = self.get_changes( to_state = self.make_project_state([
[], self.author_empty,
[self.author_empty, self.author_proxy, self.author_proxy_proxy, self.book_proxy_proxy_fk], self.author_proxy,
) self.author_proxy_proxy,
self.book_proxy_proxy_fk,
])
changes = self.get_changes([], to_state)
fk_field = changes['otherapp'][0].operations[0].fields[1][1]
self.assertEqual( self.assertEqual(
changes['otherapp'][0].operations[0].fields[1][1].remote_field.model._meta.db_table, to_state.get_concrete_model_key(fk_field.remote_field.model),
'testapp_author', ('testapp', 'author'),
) )
self.assertEqual(changes['otherapp'][0].operations[0].fields[1][1].remote_field.field_name, 'id') self.assertEqual(fk_field.remote_field.model, 'testapp.AAuthorProxyProxy')
# Change AuthorProxy to use MTI. FK still points to AAuthorProxyProxy, # Change AuthorProxy to use MTI. FK still points to AAuthorProxyProxy,
# a proxy of AuthorProxy. # a proxy of AuthorProxy.
changes = self.get_changes( from_state = to_state.clone()
[self.author_empty, self.author_proxy, self.author_proxy_proxy, self.book_proxy_proxy_fk], to_state = self.make_project_state([
[self.author_empty, self.author_proxy_notproxy, self.author_proxy_proxy, self.book_proxy_proxy_fk], self.author_empty,
) self.author_proxy_notproxy,
self.author_proxy_proxy,
self.book_proxy_proxy_fk,
])
changes = self.get_changes(from_state, to_state)
# Right number/type of migrations for the AuthorProxy model? # Right number/type of migrations for the AuthorProxy model?
self.assertNumberMigrations(changes, 'testapp', 1) self.assertNumberMigrations(changes, 'testapp', 1)
self.assertOperationTypes(changes, 'testapp', 0, ['DeleteModel', 'CreateModel']) self.assertOperationTypes(changes, 'testapp', 0, ['DeleteModel', 'CreateModel'])
@ -1714,11 +1735,12 @@ class AutodetectorTests(TestCase):
# otherapp should depend on testapp. # otherapp should depend on testapp.
self.assertMigrationDependencies(changes, 'otherapp', 0, [('testapp', 'auto_1')]) self.assertMigrationDependencies(changes, 'otherapp', 0, [('testapp', 'auto_1')])
# Now, test the pk table and field name. # Now, test the pk table and field name.
fk_field = changes['otherapp'][0].operations[0].field
self.assertEqual( self.assertEqual(
changes['otherapp'][0].operations[0].field.remote_field.model._meta.db_table, to_state.get_concrete_model_key(fk_field.remote_field.model),
'testapp_authorproxy', ('testapp', 'authorproxy'),
) )
self.assertEqual(changes['otherapp'][0].operations[0].field.remote_field.field_name, 'author_ptr') self.assertEqual(fk_field.remote_field.model, 'testapp.AAuthorProxyProxy')
def test_unmanaged_create(self): def test_unmanaged_create(self):
"""The autodetector correctly deals with managed models.""" """The autodetector correctly deals with managed models."""
@ -1761,12 +1783,14 @@ class AutodetectorTests(TestCase):
""" """
# First, we test the default pk field name # First, we test the default pk field name
changes = self.get_changes([], [self.author_unmanaged_default_pk, self.book]) changes = self.get_changes([], [self.author_unmanaged_default_pk, self.book])
# The field name the FK on the book model points to # The model the FK on the book model points to.
self.assertEqual(changes['otherapp'][0].operations[0].fields[2][1].remote_field.field_name, 'id') fk_field = changes['otherapp'][0].operations[0].fields[2][1]
self.assertEqual(fk_field.remote_field.model, 'testapp.Author')
# Now, we test the custom pk field name # Now, we test the custom pk field name
changes = self.get_changes([], [self.author_unmanaged_custom_pk, self.book]) changes = self.get_changes([], [self.author_unmanaged_custom_pk, self.book])
# The field name the FK on the book model points to # The model the FK on the book model points to.
self.assertEqual(changes['otherapp'][0].operations[0].fields[2][1].remote_field.field_name, 'pk_field') fk_field = changes['otherapp'][0].operations[0].fields[2][1]
self.assertEqual(fk_field.remote_field.model, 'testapp.Author')
@override_settings(AUTH_USER_MODEL="thirdapp.CustomUser") @override_settings(AUTH_USER_MODEL="thirdapp.CustomUser")
def test_swappable(self): def test_swappable(self):
@ -1790,11 +1814,7 @@ class AutodetectorTests(TestCase):
self.assertOperationTypes(changes, 'testapp', 0, ["AlterField"]) self.assertOperationTypes(changes, 'testapp', 0, ["AlterField"])
self.assertOperationAttributes(changes, 'testapp', 0, 0, model_name="author", name='user') self.assertOperationAttributes(changes, 'testapp', 0, 0, model_name="author", name='user')
fk_field = changes['testapp'][0].operations[0].field fk_field = changes['testapp'][0].operations[0].field
to_model = '%s.%s' % ( self.assertEqual(fk_field.remote_field.model, 'thirdapp.CustomUser')
fk_field.remote_field.model._meta.app_label,
fk_field.remote_field.model._meta.object_name,
)
self.assertEqual(to_model, 'thirdapp.CustomUser')
def test_add_field_with_default(self): def test_add_field_with_default(self):
"""#22030 - Adding a field with a default should work.""" """#22030 - Adding a field with a default should work."""