Refs #34058 -- Fixed changing/deleting sequences when altering pre-Django 4.1 auto fields on PostgreSQL.

Thanks Anders Kaseorg for the report.

Follow up to 19e6efa50b.
Regression in 2eea361eff.
This commit is contained in:
Mariusz Felisiak 2022-10-01 07:53:32 +02:00 committed by GitHub
parent 5e0aa362d9
commit bc3b8f1524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 26 deletions

View File

@ -130,6 +130,13 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
return using_sql return using_sql
return "" return ""
def _get_sequence_name(self, table, column):
with self.connection.cursor() as cursor:
for sequence in self.connection.introspection.get_sequences(cursor, table):
if sequence["column"] == column:
return sequence["name"]
return None
def _alter_column_type_sql(self, model, old_field, new_field, new_type): def _alter_column_type_sql(self, model, old_field, new_field, new_type):
# Drop indexes on varchar/text/citext columns that are changing to a # Drop indexes on varchar/text/citext columns that are changing to a
# different type. # different type.
@ -193,35 +200,38 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
} }
) )
column = strip_quotes(new_field.column) column = strip_quotes(new_field.column)
sequence_name = "%s_%s_seq" % (table, column)
fragment, _ = super()._alter_column_type_sql( fragment, _ = super()._alter_column_type_sql(
model, old_field, new_field, new_type model, old_field, new_field, new_type
) )
return fragment, [ # Drop the sequence if exists (Django 4.1+ identity columns don't
# have it).
other_actions = []
if sequence_name := self._get_sequence_name(table, column):
other_actions = [
( (
# Drop the sequence if exists (Django 4.1+ identity columns
# don't have it).
self.sql_delete_sequence self.sql_delete_sequence
% { % {
"sequence": self.quote_name(sequence_name), "sequence": self.quote_name(sequence_name),
}, },
[], [],
), )
] ]
return fragment, other_actions
elif new_is_auto and old_is_auto and old_internal_type != new_internal_type: elif new_is_auto and old_is_auto and old_internal_type != new_internal_type:
fragment, _ = super()._alter_column_type_sql( fragment, _ = super()._alter_column_type_sql(
model, old_field, new_field, new_type model, old_field, new_field, new_type
) )
column = strip_quotes(new_field.column) column = strip_quotes(new_field.column)
sequence_name = f"{table}_{column}_seq"
db_types = { db_types = {
"AutoField": "integer", "AutoField": "integer",
"BigAutoField": "bigint", "BigAutoField": "bigint",
"SmallAutoField": "smallint", "SmallAutoField": "smallint",
} }
return fragment, [ # Alter the sequence type if exists (Django 4.1+ identity columns
# Alter the sequence type if exists (Django 4.1+ identity # don't have it).
# columns don't have it). other_actions = []
if sequence_name := self._get_sequence_name(table, column):
other_actions = [
( (
self.sql_alter_sequence_type self.sql_alter_sequence_type
% { % {
@ -231,6 +241,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
[], [],
), ),
] ]
return fragment, other_actions
else: else:
return super()._alter_column_type_sql(model, old_field, new_field, new_type) return super()._alter_column_type_sql(model, old_field, new_field, new_type)

View File

@ -1896,14 +1896,30 @@ class SchemaTests(TransactionTestCase):
new_field.set_attributes_from_name("id") new_field.set_attributes_from_name("id")
with connection.schema_editor() as editor: with connection.schema_editor() as editor:
editor.alter_field(SerialAutoField, old_field, new_field, strict=True) editor.alter_field(SerialAutoField, old_field, new_field, strict=True)
sequence_name = f"{table}_{column}_seq"
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.execute( cursor.execute(
"SELECT data_type FROM pg_sequences WHERE sequencename = %s", "SELECT data_type FROM pg_sequences WHERE sequencename = %s",
[f"{table}_{column}_seq"], [sequence_name],
) )
row = cursor.fetchone() row = cursor.fetchone()
sequence_data_type = row[0] if row and row[0] else None sequence_data_type = row[0] if row and row[0] else None
self.assertEqual(sequence_data_type, "bigint") self.assertEqual(sequence_data_type, "bigint")
# Rename the column.
old_field = new_field
new_field = AutoField(primary_key=True)
new_field.model = SerialAutoField
new_field.set_attributes_from_name("renamed_id")
with connection.schema_editor() as editor:
editor.alter_field(SerialAutoField, old_field, new_field, strict=True)
with connection.cursor() as cursor:
cursor.execute(
"SELECT data_type FROM pg_sequences WHERE sequencename = %s",
[sequence_name],
)
row = cursor.fetchone()
sequence_data_type = row[0] if row and row[0] else None
self.assertEqual(sequence_data_type, "integer")
finally: finally:
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.execute(f'DROP TABLE "{table}"') cursor.execute(f'DROP TABLE "{table}"')