Fixed #30060 -- Moved SQL generation for indexes and constraints to SchemaEditor.

This commit is contained in:
Paveł Tyślacki 2019-01-01 17:39:58 +03:00 committed by Tim Graham
parent e5ae9488ac
commit 0123b67f6b
8 changed files with 146 additions and 117 deletions

View File

@ -61,16 +61,21 @@ class BaseDatabaseSchemaEditor:
sql_rename_column = "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
sql_update_with_default = "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
sql_foreign_key_constraint = "FOREIGN KEY (%(column)s) REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
sql_unique_constraint = "UNIQUE (%(columns)s)"
sql_check_constraint = "CHECK (%(check)s)"
sql_create_constraint = "ALTER TABLE %(table)s ADD %(constraint)s"
sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
sql_create_unique = None
sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
sql_delete_check = sql_delete_constraint
sql_create_unique = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s UNIQUE (%(columns)s)"
sql_delete_unique = sql_delete_constraint
sql_create_fk = (
"ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
"REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
)
sql_create_inline_fk = None
sql_delete_fk = sql_delete_constraint
@ -287,7 +292,7 @@ class BaseDatabaseSchemaEditor:
for fields in model._meta.unique_together:
columns = [model._meta.get_field(field).column for field in fields]
self.deferred_sql.append(self._create_unique_sql(model, columns))
constraints = [check.full_constraint_sql(model, self) for check in model._meta.constraints]
constraints = [check.constraint_sql(model, self) for check in model._meta.constraints]
# Make the table
sql = self.sql_create_table % {
"table": self.quote_name(model._meta.db_table),
@ -422,7 +427,7 @@ class BaseDatabaseSchemaEditor:
# Check constraints can go on the column SQL here
db_params = field.db_parameters(connection=self.connection)
if db_params['check']:
definition += " CHECK (%s)" % db_params['check']
definition += " " + self.sql_check_constraint % db_params
# Build the SQL and run it
sql = self.sql_create_column % {
"table": self.quote_name(model._meta.db_table),
@ -463,7 +468,7 @@ class BaseDatabaseSchemaEditor:
if field.remote_field:
fk_names = self._constraint_names(model, [field.column], foreign_key=True)
for fk_name in fk_names:
self.execute(self._delete_constraint_sql(self.sql_delete_fk, model, fk_name))
self.execute(self._delete_fk_sql(model, fk_name))
# Delete the column
sql = self.sql_delete_column % {
"table": self.quote_name(model._meta.db_table),
@ -534,7 +539,7 @@ class BaseDatabaseSchemaEditor:
))
for fk_name in fk_names:
fks_dropped.add((old_field.column,))
self.execute(self._delete_constraint_sql(self.sql_delete_fk, model, fk_name))
self.execute(self._delete_fk_sql(model, fk_name))
# Has unique been removed?
if old_field.unique and (not new_field.unique or self._field_became_primary_key(old_field, new_field)):
# Find the unique constraint for this field
@ -546,7 +551,7 @@ class BaseDatabaseSchemaEditor:
old_field.column,
))
for constraint_name in constraint_names:
self.execute(self._delete_constraint_sql(self.sql_delete_unique, model, constraint_name))
self.execute(self._delete_unique_sql(model, constraint_name))
# Drop incoming FK constraints if the field is a primary key or unique,
# which might be a to_field target, and things are going to change.
drop_foreign_keys = (
@ -563,7 +568,7 @@ class BaseDatabaseSchemaEditor:
new_rel.related_model, [new_rel.field.column], foreign_key=True
)
for fk_name in rel_fk_names:
self.execute(self._delete_constraint_sql(self.sql_delete_fk, new_rel.related_model, fk_name))
self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
# Removed an index? (no strict check, as multiple indexes are possible)
# Remove indexes if db_index switched to False or a unique constraint
# will now be used in lieu of an index. The following lines from the
@ -585,7 +590,7 @@ class BaseDatabaseSchemaEditor:
# The only way to check if an index was created with
# db_index=True or with Index(['field'], name='foo')
# is to look at its name (refs #28053).
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_name))
self.execute(self._delete_index_sql(model, index_name))
# Change check constraints?
if old_db_params['check'] != new_db_params['check'] and old_db_params['check']:
constraint_names = self._constraint_names(model, [old_field.column], check=True)
@ -596,7 +601,7 @@ class BaseDatabaseSchemaEditor:
old_field.column,
))
for constraint_name in constraint_names:
self.execute(self._delete_constraint_sql(self.sql_delete_constraint, model, constraint_name))
self.execute(self._delete_check_sql(model, constraint_name))
# Have they renamed the column?
if old_field.column != new_field.column:
self.execute(self._rename_field_sql(model._meta.db_table, old_field, new_field, new_type))
@ -707,15 +712,7 @@ class BaseDatabaseSchemaEditor:
# Changed to become primary key?
if self._field_became_primary_key(old_field, new_field):
# Make the new one
self.execute(
self.sql_create_pk % {
"table": self.quote_name(model._meta.db_table),
"name": self.quote_name(
self._create_index_name(model._meta.db_table, [new_field.column], suffix="_pk")
),
"columns": self.quote_name(new_field.column),
}
)
self.execute(self._create_primary_key_sql(model, new_field))
# Update all referencing columns
rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
# Handle our type alters on the other end of rels from the PK stuff above
@ -746,18 +743,8 @@ class BaseDatabaseSchemaEditor:
self.execute(self._create_fk_sql(rel.related_model, rel.field, "_fk"))
# Does it have check constraints we need to add?
if old_db_params['check'] != new_db_params['check'] and new_db_params['check']:
constraint = self.sql_constraint % {
'name': self.quote_name(
self._create_index_name(model._meta.db_table, [new_field.column], suffix='_check'),
),
'constraint': self.sql_check_constraint % new_db_params,
}
self.execute(
self.sql_create_constraint % {
'table': self.quote_name(model._meta.db_table),
'constraint': constraint,
}
)
constraint_name = self._create_index_name(model._meta.db_table, [new_field.column], suffix='_check')
self.execute(self._create_check_sql(model, constraint_name, new_db_params['check']))
# Drop the default if we need to
# (Django usually does not use in-database defaults)
if needs_database_default:
@ -931,7 +918,14 @@ class BaseDatabaseSchemaEditor:
using=using,
columns=self._index_columns(table, columns, col_suffixes, opclasses),
extra=tablespace_sql,
condition=condition,
condition=(' WHERE ' + condition) if condition else '',
)
def _delete_index_sql(self, model, name):
return Statement(
self.sql_delete_index,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(name),
)
def _index_columns(self, table, columns, col_suffixes, opclasses):
@ -984,10 +978,6 @@ class BaseDatabaseSchemaEditor:
"type": new_type,
}
def _create_constraint_sql(self, table, name, constraint):
constraint = Statement(self.sql_constraint, name=name, constraint=constraint)
return Statement(self.sql_create_constraint, table=table, constraint=constraint)
def _create_fk_sql(self, model, field, suffix):
def create_fk_name(*args, **kwargs):
return self.quote_name(self._create_index_name(*args, **kwargs))
@ -1005,14 +995,27 @@ class BaseDatabaseSchemaEditor:
to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
to_column = Columns(field.target_field.model._meta.db_table, [field.target_field.column], self.quote_name)
deferrable = self.connection.ops.deferrable_sql()
constraint = Statement(
self.sql_foreign_key_constraint,
return Statement(
self.sql_create_fk,
table=table,
name=name,
column=column,
to_table=to_table,
to_column=to_column,
deferrable=deferrable,
)
return self._create_constraint_sql(table, name, constraint)
def _delete_fk_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_fk, model, name)
def _unique_sql(self, fields, name):
constraint = self.sql_unique_constraint % {
'columns': ', '.join(map(self.quote_name, fields)),
}
return self.sql_constraint % {
'name': self.quote_name(name),
'constraint': constraint,
}
def _create_unique_sql(self, model, columns, name=None):
def create_unique_name(*args, **kwargs):
@ -1024,23 +1027,39 @@ class BaseDatabaseSchemaEditor:
else:
name = self.quote_name(name)
columns = Columns(table, columns, self.quote_name)
if self.sql_create_unique:
# Some databases use a different syntax for unique constraint
# creation.
return Statement(
self.sql_create_unique,
table=table,
name=name,
columns=columns,
)
constraint = Statement(self.sql_unique_constraint, columns=columns)
return self._create_constraint_sql(table, name, constraint)
def _delete_unique_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_unique, model, name)
def _check_sql(self, name, check):
return self.sql_constraint % {
'name': self.quote_name(name),
'constraint': self.sql_check_constraint % {'check': check},
}
def _create_check_sql(self, model, name, check):
return Statement(
self.sql_create_check,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(name),
check=check,
)
def _delete_check_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_check, model, name)
def _delete_constraint_sql(self, template, model, name):
return template % {
"table": self.quote_name(model._meta.db_table),
"name": self.quote_name(name),
}
return Statement(
template,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(name),
)
def _constraint_names(self, model, column_names=None, unique=None,
primary_key=None, index=None, foreign_key=None,
@ -1079,7 +1098,20 @@ class BaseDatabaseSchemaEditor:
model._meta.db_table,
))
for constraint_name in constraint_names:
self.execute(self._delete_constraint_sql(self.sql_delete_pk, model, constraint_name))
self.execute(self._delete_primary_key_sql(model, constraint_name))
def _create_primary_key_sql(self, model, field):
return Statement(
self.sql_create_pk,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(
self._create_index_name(model._meta.db_table, [field.column], suffix="_pk")
),
columns=Columns(model._meta.db_table, [field.column], self.quote_name),
)
def _delete_primary_key_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_pk, model, name)
def remove_procedure(self, procedure_name, param_types=()):
sql = self.sql_delete_procedure % {

View File

@ -114,7 +114,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
(old_type.startswith('citext') and not new_type.startswith('citext'))
):
index_name = self._create_index_name(model._meta.db_table, [old_field.column], suffix='_like')
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_name))
self.execute(self._delete_index_sql(model, index_name))
super()._alter_field(
model, old_field, new_field, old_type, new_type, old_db_params,
@ -130,7 +130,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# Removed an index? Drop any PostgreSQL-specific indexes.
if old_field.unique and not (new_field.db_index or new_field.unique):
index_to_remove = self._create_index_name(model._meta.db_table, [old_field.column], suffix='_like')
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_to_remove))
self.execute(self._delete_index_sql(model, index_to_remove))
def _index_columns(self, table, columns, col_suffixes, opclasses):
if opclasses:

View File

@ -11,10 +11,10 @@ from django.db.utils import NotSupportedError
class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_delete_table = "DROP TABLE %(table)s"
sql_create_fk = None
sql_create_inline_fk = "REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED"
sql_create_unique = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)"
sql_delete_unique = "DROP INDEX %(name)s"
sql_foreign_key_constraint = None
def __enter__(self):
# Some SQLite schema alterations need foreign key constraints to be

View File

@ -10,25 +10,11 @@ class BaseConstraint:
def constraint_sql(self, model, schema_editor):
raise NotImplementedError('This method must be implemented by a subclass.')
def full_constraint_sql(self, model, schema_editor):
return schema_editor.sql_constraint % {
'name': schema_editor.quote_name(self.name),
'constraint': self.constraint_sql(model, schema_editor),
}
def create_sql(self, model, schema_editor):
sql = self.full_constraint_sql(model, schema_editor)
return schema_editor.sql_create_constraint % {
'table': schema_editor.quote_name(model._meta.db_table),
'constraint': sql,
}
raise NotImplementedError('This method must be implemented by a subclass.')
def remove_sql(self, model, schema_editor):
quote_name = schema_editor.quote_name
return schema_editor.sql_delete_constraint % {
'table': quote_name(model._meta.db_table),
'name': quote_name(self.name),
}
raise NotImplementedError('This method must be implemented by a subclass.')
def deconstruct(self):
path = '%s.%s' % (self.__class__.__module__, self.__class__.__name__)
@ -45,14 +31,23 @@ class CheckConstraint(BaseConstraint):
self.check = check
super().__init__(name)
def constraint_sql(self, model, schema_editor):
query = Query(model)
def _get_check_sql(self, model, schema_editor):
query = Query(model=model)
where = query.build_where(self.check)
connection = schema_editor.connection
compiler = connection.ops.compiler('SQLCompiler')(query, connection, 'default')
sql, params = where.as_sql(compiler, connection)
params = tuple(schema_editor.quote_value(p) for p in params)
return schema_editor.sql_check_constraint % {'check': sql % params}
compiler = query.get_compiler(connection=schema_editor.connection)
sql, params = where.as_sql(compiler, schema_editor.connection)
return sql % tuple(schema_editor.quote_value(p) for p in params)
def constraint_sql(self, model, schema_editor):
check = self._get_check_sql(model, schema_editor)
return schema_editor._check_sql(self.name, check)
def create_sql(self, model, schema_editor):
check = self._get_check_sql(model, schema_editor)
return schema_editor._create_check_sql(model, self.name, check)
def remove_sql(self, model, schema_editor):
return schema_editor._delete_check_sql(model, self.name)
def __repr__(self):
return "<%s: check='%s' name=%r>" % (self.__class__.__name__, self.check, self.name)
@ -78,17 +73,15 @@ class UniqueConstraint(BaseConstraint):
super().__init__(name)
def constraint_sql(self, model, schema_editor):
columns = (
model._meta.get_field(field_name).column
for field_name in self.fields
)
return schema_editor.sql_unique_constraint % {
'columns': ', '.join(map(schema_editor.quote_name, columns)),
}
fields = [model._meta.get_field(field_name).column for field_name in self.fields]
return schema_editor._unique_sql(fields, self.name)
def create_sql(self, model, schema_editor):
columns = [model._meta.get_field(field_name).column for field_name in self.fields]
return schema_editor._create_unique_sql(model, columns, self.name)
fields = [model._meta.get_field(field_name).column for field_name in self.fields]
return schema_editor._create_unique_sql(model, fields, self.name)
def remove_sql(self, model, schema_editor):
return schema_editor._delete_unique_sql(model, self.name)
def __repr__(self):
return '<%s: fields=%r name=%r>' % (self.__class__.__name__, self.fields, self.name)

View File

@ -65,7 +65,7 @@ class Index:
sql, params = query.where.as_sql(compiler=compiler, connection=schema_editor.connection)
# BaseDatabaseSchemaEditor does the same map on the params, but since
# it's handled outside of that class, the work is done here.
return ' WHERE ' + (sql % tuple(map(schema_editor.quote_value, params)))
return sql % tuple(map(schema_editor.quote_value, params))
def create_sql(self, model, schema_editor, using=''):
fields = [model._meta.get_field(field_name) for field_name, _ in self.fields_orders]
@ -77,11 +77,7 @@ class Index:
)
def remove_sql(self, model, schema_editor):
quote_name = schema_editor.quote_name
return schema_editor.sql_delete_index % {
'table': quote_name(model._meta.db_table),
'name': quote_name(self.name),
}
return schema_editor._delete_index_sql(model, self.name)
def deconstruct(self):
path = '%s.%s' % (self.__class__.__module__, self.__class__.__name__)

View File

@ -322,13 +322,6 @@ backends.
* Third party database backends must implement support for partial indexes or
set ``DatabaseFeatures.supports_partial_indexes`` to ``False``.
* Several ``SchemaEditor`` attributes are changed:
* ``sql_create_check`` is replaced with ``sql_create_constraint``.
* ``sql_delete_check`` is replaced with ``sql_delete_constraint``.
* ``sql_create_fk`` is replaced with ``sql_foreign_key_constraint``,
``sql_constraint``, and ``sql_create_constraint``.
* ``DatabaseIntrospection.table_name_converter()`` and
``column_name_converter()`` are removed. Third party database backends may
need to instead implement ``DatabaseIntrospection.identifier_converter()``.
@ -336,6 +329,15 @@ backends.
``DatabaseIntrospection.get_constraints()`` returns must be normalized by
``identifier_converter()``.
* SQL generation for indexes is moved from :class:`~django.db.models.Index` to
``SchemaEditor`` and these ``SchemaEditor`` methods are added:
* ``_create_primary_key_sql()`` and ``_delete_primary_key_sql()``
* ``_delete_index_sql()`` (to pair with ``_create_index_sql()``)
* ``_delete_unique_sql`` (to pair with ``_create_unique_sql()``)
* ``_delete_fk_sql()`` (to pair with ``_create_fk_sql()``)
* ``_create_check_sql()`` and ``_delete_check_sql()``
Admin actions are no longer collected from base ``ModelAdmin`` classes
----------------------------------------------------------------------

View File

@ -18,6 +18,18 @@ class BaseConstraintTests(SimpleTestCase):
with self.assertRaisesMessage(NotImplementedError, msg):
c.constraint_sql(None, None)
def test_create_sql(self):
c = BaseConstraint('name')
msg = 'This method must be implemented by a subclass.'
with self.assertRaisesMessage(NotImplementedError, msg):
c.create_sql(None, None)
def test_remove_sql(self):
c = BaseConstraint('name')
msg = 'This method must be implemented by a subclass.'
with self.assertRaisesMessage(NotImplementedError, msg):
c.remove_sql(None, None)
class CheckConstraintTests(TestCase):
def test_repr(self):

View File

@ -2147,24 +2147,18 @@ class SchemaTests(TransactionTestCase):
editor.alter_field(model, get_field(unique=True), field, strict=True)
self.assertNotIn(expected_constraint_name, self.get_constraints(model._meta.db_table))
if editor.sql_foreign_key_constraint:
if editor.sql_create_fk:
constraint_name = 'CamelCaseFKConstraint'
expected_constraint_name = identifier_converter(constraint_name)
fk_sql = editor.sql_foreign_key_constraint % {
editor.execute(
editor.sql_create_fk % {
"table": editor.quote_name(table),
"name": editor.quote_name(constraint_name),
"column": editor.quote_name(column),
"to_table": editor.quote_name(table),
"to_column": editor.quote_name(model._meta.auto_field.column),
"deferrable": connection.ops.deferrable_sql(),
}
constraint_sql = editor.sql_constraint % {
"name": editor.quote_name(constraint_name),
"constraint": fk_sql,
}
editor.execute(
editor.sql_create_constraint % {
"table": editor.quote_name(table),
"constraint": constraint_sql,
}
)
self.assertIn(expected_constraint_name, self.get_constraints(model._meta.db_table))
editor.alter_field(model, get_field(Author, CASCADE, field_class=ForeignKey), field, strict=True)