[4.1.x] Refs #34058 -- Fixed changing/deleting sequences when altering pre-Django 4.1 auto fields on PostgreSQL.

Thanks Anders Kaseorg for the report.

Follow up to 19e6efa50b.
Regression in 2eea361eff.

Backport of bc3b8f1524 from main
This commit is contained in:
Mariusz Felisiak 2022-10-01 07:53:32 +02:00
parent 7a1675806a
commit 96c541ecef
2 changed files with 53 additions and 26 deletions

View File

@ -112,6 +112,13 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
)
return None
def _get_sequence_name(self, table, column):
with self.connection.cursor() as cursor:
for sequence in self.connection.introspection.get_sequences(cursor, table):
if sequence["column"] == column:
return sequence["name"]
return None
def _alter_column_type_sql(self, model, old_field, new_field, new_type):
self.sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
# Cast when data type changed.
@ -168,44 +175,48 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
}
)
column = strip_quotes(new_field.column)
sequence_name = "%s_%s_seq" % (table, column)
fragment, _ = super()._alter_column_type_sql(
model, old_field, new_field, new_type
)
return fragment, [
(
# Drop the sequence if exists (Django 4.1+ identity columns
# don't have it).
self.sql_delete_sequence
% {
"sequence": self.quote_name(sequence_name),
},
[],
),
]
# Drop the sequence if exists (Django 4.1+ identity columns don't
# have it).
other_actions = []
if sequence_name := self._get_sequence_name(table, column):
other_actions = [
(
self.sql_delete_sequence
% {
"sequence": self.quote_name(sequence_name),
},
[],
)
]
return fragment, other_actions
elif new_is_auto and old_is_auto and old_internal_type != new_internal_type:
fragment, _ = super()._alter_column_type_sql(
model, old_field, new_field, new_type
)
column = strip_quotes(new_field.column)
sequence_name = f"{table}_{column}_seq"
db_types = {
"AutoField": "integer",
"BigAutoField": "bigint",
"SmallAutoField": "smallint",
}
return fragment, [
# Alter the sequence type if exists (Django 4.1+ identity
# columns don't have it).
(
self.sql_alter_sequence_type
% {
"sequence": self.quote_name(sequence_name),
"type": db_types[new_internal_type],
},
[],
),
]
# Alter the sequence type if exists (Django 4.1+ identity columns
# don't have it).
other_actions = []
if sequence_name := self._get_sequence_name(table, column):
other_actions = [
(
self.sql_alter_sequence_type
% {
"sequence": self.quote_name(sequence_name),
"type": db_types[new_internal_type],
},
[],
),
]
return fragment, other_actions
else:
return super()._alter_column_type_sql(model, old_field, new_field, new_type)

View File

@ -1808,14 +1808,30 @@ class SchemaTests(TransactionTestCase):
new_field.set_attributes_from_name("id")
with connection.schema_editor() as editor:
editor.alter_field(SerialAutoField, old_field, new_field, strict=True)
sequence_name = f"{table}_{column}_seq"
with connection.cursor() as cursor:
cursor.execute(
"SELECT data_type FROM pg_sequences WHERE sequencename = %s",
[f"{table}_{column}_seq"],
[sequence_name],
)
row = cursor.fetchone()
sequence_data_type = row[0] if row and row[0] else None
self.assertEqual(sequence_data_type, "bigint")
# Rename the column.
old_field = new_field
new_field = AutoField(primary_key=True)
new_field.model = SerialAutoField
new_field.set_attributes_from_name("renamed_id")
with connection.schema_editor() as editor:
editor.alter_field(SerialAutoField, old_field, new_field, strict=True)
with connection.cursor() as cursor:
cursor.execute(
"SELECT data_type FROM pg_sequences WHERE sequencename = %s",
[sequence_name],
)
row = cursor.fetchone()
sequence_data_type = row[0] if row and row[0] else None
self.assertEqual(sequence_data_type, "integer")
finally:
with connection.cursor() as cursor:
cursor.execute(f'DROP TABLE "{table}"')