Fixed #26682 -- Added support for Oracle identity columns.
Thanks Shai Berger and Tim Graham for reviews.
This commit is contained in:
parent
cde31daf88
commit
924a89e135
|
@ -710,14 +710,7 @@ class BaseDatabaseSchemaEditor:
|
|||
# will always come along and replace it.
|
||||
if not old_field.primary_key and new_field.primary_key:
|
||||
# First, drop the old PK
|
||||
constraint_names = self._constraint_names(model, primary_key=True)
|
||||
if strict and len(constraint_names) != 1:
|
||||
raise ValueError("Found wrong number (%s) of PK constraints for %s" % (
|
||||
len(constraint_names),
|
||||
model._meta.db_table,
|
||||
))
|
||||
for constraint_name in constraint_names:
|
||||
self.execute(self._delete_constraint_sql(self.sql_delete_pk, model, constraint_name))
|
||||
self._delete_primary_key(model, strict)
|
||||
# Make the new one
|
||||
self.execute(
|
||||
self.sql_create_pk % {
|
||||
|
@ -978,3 +971,13 @@ class BaseDatabaseSchemaEditor:
|
|||
continue
|
||||
result.append(name)
|
||||
return result
|
||||
|
||||
def _delete_primary_key(self, model, strict=False):
|
||||
constraint_names = self._constraint_names(model, primary_key=True)
|
||||
if strict and len(constraint_names) != 1:
|
||||
raise ValueError('Found wrong number (%s) of PK constraints for %s' % (
|
||||
len(constraint_names),
|
||||
model._meta.db_table,
|
||||
))
|
||||
for constraint_name in constraint_names:
|
||||
self.execute(self._delete_constraint_sql(self.sql_delete_pk, model, constraint_name))
|
||||
|
|
|
@ -82,8 +82,8 @@ class DatabaseWrapper(BaseDatabaseWrapper):
|
|||
# Any format strings starting with "qn_" are quoted before being used in the
|
||||
# output (the "qn_" prefix is stripped before the lookup is performed.
|
||||
data_types = {
|
||||
'AutoField': 'NUMBER(11)',
|
||||
'BigAutoField': 'NUMBER(19)',
|
||||
'AutoField': 'NUMBER(11) GENERATED BY DEFAULT ON NULL AS IDENTITY',
|
||||
'BigAutoField': 'NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY',
|
||||
'BinaryField': 'BLOB',
|
||||
'BooleanField': 'NUMBER(1)',
|
||||
'CharField': 'NVARCHAR2(%(max_length)s)',
|
||||
|
|
|
@ -29,12 +29,22 @@ class DatabaseOperations(BaseDatabaseOperations):
|
|||
DECLARE
|
||||
table_value integer;
|
||||
seq_value integer;
|
||||
seq_name user_tab_identity_cols.sequence_name%%TYPE;
|
||||
BEGIN
|
||||
BEGIN
|
||||
SELECT sequence_name INTO seq_name FROM user_tab_identity_cols
|
||||
WHERE table_name = '%(table_name)s' AND
|
||||
column_name = '%(column_name)s';
|
||||
EXCEPTION WHEN NO_DATA_FOUND THEN
|
||||
seq_name := '%(no_autofield_sequence_name)s';
|
||||
END;
|
||||
|
||||
SELECT NVL(MAX(%(column)s), 0) INTO table_value FROM %(table)s;
|
||||
SELECT NVL(last_number - cache_size, 0) INTO seq_value FROM user_sequences
|
||||
WHERE sequence_name = '%(sequence)s';
|
||||
WHERE sequence_name = seq_name;
|
||||
WHILE table_value > seq_value LOOP
|
||||
seq_value := "%(sequence)s".nextval;
|
||||
EXECUTE IMMEDIATE 'SELECT "'||seq_name||'".nextval FROM DUAL'
|
||||
INTO seq_value;
|
||||
END LOOP;
|
||||
END;
|
||||
/"""
|
||||
|
@ -43,37 +53,6 @@ END;
|
|||
super().__init__(*args, **kwargs)
|
||||
self.set_operators['difference'] = 'MINUS'
|
||||
|
||||
def autoinc_sql(self, table, column):
|
||||
# To simulate auto-incrementing primary keys in Oracle, we have to
|
||||
# create a sequence and a trigger.
|
||||
args = {
|
||||
'sq_name': self._get_sequence_name(table),
|
||||
'tr_name': self._get_trigger_name(table),
|
||||
'tbl_name': self.quote_name(table),
|
||||
'col_name': self.quote_name(column),
|
||||
}
|
||||
sequence_sql = """
|
||||
DECLARE
|
||||
i INTEGER;
|
||||
BEGIN
|
||||
SELECT COUNT(1) INTO i FROM USER_SEQUENCES
|
||||
WHERE SEQUENCE_NAME = '%(sq_name)s';
|
||||
IF i = 0 THEN
|
||||
EXECUTE IMMEDIATE 'CREATE SEQUENCE "%(sq_name)s"';
|
||||
END IF;
|
||||
END;
|
||||
/""" % args
|
||||
trigger_sql = """
|
||||
CREATE OR REPLACE TRIGGER "%(tr_name)s"
|
||||
BEFORE INSERT ON %(tbl_name)s
|
||||
FOR EACH ROW
|
||||
WHEN (new.%(col_name)s IS NULL)
|
||||
BEGIN
|
||||
:new.%(col_name)s := "%(sq_name)s".nextval;
|
||||
END;
|
||||
/""" % args
|
||||
return sequence_sql, trigger_sql
|
||||
|
||||
def cache_key_culling_sql(self):
|
||||
return """
|
||||
SELECT cache_key
|
||||
|
@ -253,7 +232,7 @@ WHEN (new.%(col_name)s IS NULL)
|
|||
return super().last_executed_query(cursor, statement, params)
|
||||
|
||||
def last_insert_id(self, cursor, table_name, pk_name):
|
||||
sq_name = self._get_sequence_name(table_name)
|
||||
sq_name = self._get_sequence_name(cursor, strip_quotes(table_name), pk_name)
|
||||
cursor.execute('"%s".currval' % sq_name)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
|
@ -397,13 +376,15 @@ WHEN (new.%(col_name)s IS NULL)
|
|||
def sequence_reset_by_name_sql(self, style, sequences):
|
||||
sql = []
|
||||
for sequence_info in sequences:
|
||||
sequence_name = self._get_sequence_name(sequence_info['table'])
|
||||
table_name = self.quote_name(sequence_info['table'])
|
||||
column_name = self.quote_name(sequence_info['column'] or 'id')
|
||||
no_autofield_sequence_name = self._get_no_autofield_sequence_name(sequence_info['table'])
|
||||
table = self.quote_name(sequence_info['table'])
|
||||
column = self.quote_name(sequence_info['column'] or 'id')
|
||||
query = self._sequence_reset_sql % {
|
||||
'sequence': sequence_name,
|
||||
'table': table_name,
|
||||
'column': column_name,
|
||||
'no_autofield_sequence_name': no_autofield_sequence_name,
|
||||
'table': table,
|
||||
'column': column,
|
||||
'table_name': strip_quotes(table),
|
||||
'column_name': strip_quotes(column),
|
||||
}
|
||||
sql.append(query)
|
||||
return sql
|
||||
|
@ -415,23 +396,31 @@ WHEN (new.%(col_name)s IS NULL)
|
|||
for model in model_list:
|
||||
for f in model._meta.local_fields:
|
||||
if isinstance(f, models.AutoField):
|
||||
table_name = self.quote_name(model._meta.db_table)
|
||||
sequence_name = self._get_sequence_name(model._meta.db_table)
|
||||
column_name = self.quote_name(f.column)
|
||||
output.append(query % {'sequence': sequence_name,
|
||||
'table': table_name,
|
||||
'column': column_name})
|
||||
no_autofield_sequence_name = self._get_no_autofield_sequence_name(model._meta.db_table)
|
||||
table = self.quote_name(model._meta.db_table)
|
||||
column = self.quote_name(f.column)
|
||||
output.append(query % {
|
||||
'no_autofield_sequence_name': no_autofield_sequence_name,
|
||||
'table': table,
|
||||
'column': column,
|
||||
'table_name': strip_quotes(table),
|
||||
'column_name': strip_quotes(column),
|
||||
})
|
||||
# Only one AutoField is allowed per model, so don't
|
||||
# continue to loop
|
||||
break
|
||||
for f in model._meta.many_to_many:
|
||||
if not f.remote_field.through:
|
||||
table_name = self.quote_name(f.m2m_db_table())
|
||||
sequence_name = self._get_sequence_name(f.m2m_db_table())
|
||||
column_name = self.quote_name('id')
|
||||
output.append(query % {'sequence': sequence_name,
|
||||
'table': table_name,
|
||||
'column': column_name})
|
||||
no_autofield_sequence_name = self._get_no_autofield_sequence_name(f.m2m_db_table())
|
||||
table = self.quote_name(f.m2m_db_table())
|
||||
column = self.quote_name('id')
|
||||
output.append(query % {
|
||||
'no_autofield_sequence_name': no_autofield_sequence_name,
|
||||
'table': table,
|
||||
'column': column,
|
||||
'table_name': strip_quotes(table),
|
||||
'column_name': 'ID',
|
||||
})
|
||||
return output
|
||||
|
||||
def start_transaction_sql(self):
|
||||
|
@ -512,15 +501,23 @@ WHEN (new.%(col_name)s IS NULL)
|
|||
return 'POWER(%s)' % ','.join(sub_expressions)
|
||||
return super().combine_expression(connector, sub_expressions)
|
||||
|
||||
def _get_sequence_name(self, table):
|
||||
def _get_no_autofield_sequence_name(self, table):
|
||||
"""
|
||||
Manually created sequence name to keep backward compatibility for
|
||||
AutoFields that aren't Oracle identity columns.
|
||||
"""
|
||||
name_length = self.max_name_length() - 3
|
||||
sequence_name = '%s_SQ' % strip_quotes(table)
|
||||
return truncate_name(sequence_name, name_length).upper()
|
||||
|
||||
def _get_trigger_name(self, table):
|
||||
name_length = self.max_name_length() - 3
|
||||
trigger_name = '%s_TR' % strip_quotes(table)
|
||||
return truncate_name(trigger_name, name_length).upper()
|
||||
def _get_sequence_name(self, cursor, table, pk_name):
|
||||
cursor.execute("""
|
||||
SELECT sequence_name
|
||||
FROM user_tab_identity_cols
|
||||
WHERE table_name = UPPER(%s)
|
||||
AND column_name = UPPER(%s)""", [table, pk_name])
|
||||
row = cursor.fetchone()
|
||||
return self._get_no_autofield_sequence_name(table) if row is None else row[0]
|
||||
|
||||
def bulk_insert_sql(self, fields, placeholder_rows):
|
||||
query = []
|
||||
|
@ -528,13 +525,20 @@ WHEN (new.%(col_name)s IS NULL)
|
|||
select = []
|
||||
for i, placeholder in enumerate(row):
|
||||
# A model without any fields has fields=[None].
|
||||
if not fields[i]:
|
||||
select.append(placeholder)
|
||||
else:
|
||||
if fields[i]:
|
||||
internal_type = getattr(fields[i], 'target_field', fields[i]).get_internal_type()
|
||||
select.append(BulkInsertMapper.types.get(internal_type, '%s') % placeholder)
|
||||
placeholder = BulkInsertMapper.types.get(internal_type, '%s') % placeholder
|
||||
# Add columns aliases to the first select to avoid "ORA-00918:
|
||||
# column ambiguously defined" when two or more columns in the
|
||||
# first select have the same value.
|
||||
if not query:
|
||||
placeholder = '%s col_%s' % (placeholder, i)
|
||||
select.append(placeholder)
|
||||
query.append('SELECT %s FROM DUAL' % ', '.join(select))
|
||||
return ' UNION ALL '.join(query)
|
||||
# Bulk insert to tables with Oracle identity columns causes Oracle to
|
||||
# add sequence.nextval to it. Sequence.nextval cannot be used with the
|
||||
# UNION operator. To prevent incorrect SQL, move UNION to a subquery.
|
||||
return 'SELECT * FROM (%s)' % ' UNION ALL '.join(query)
|
||||
|
||||
def subtract_temporals(self, internal_type, lhs, rhs):
|
||||
if internal_type == 'DateField':
|
||||
|
|
|
@ -31,10 +31,17 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||
else:
|
||||
return str(value)
|
||||
|
||||
def remove_field(self, model, field):
|
||||
# If the column is an identity column, drop the identity before
|
||||
# removing the field.
|
||||
if self._is_identity_column(model._meta.db_table, field.column):
|
||||
self._drop_identity(model._meta.db_table, field.column)
|
||||
super().remove_field(model, field)
|
||||
|
||||
def delete_model(self, model):
|
||||
# Run superclass action
|
||||
super().delete_model(model)
|
||||
# Clean up any autoincrement trigger
|
||||
# Clean up manually created sequence.
|
||||
self.execute("""
|
||||
DECLARE
|
||||
i INTEGER;
|
||||
|
@ -45,7 +52,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||
EXECUTE IMMEDIATE 'DROP SEQUENCE "%(sq_name)s"';
|
||||
END IF;
|
||||
END;
|
||||
/""" % {'sq_name': self.connection.ops._get_sequence_name(model._meta.db_table)})
|
||||
/""" % {'sq_name': self.connection.ops._get_no_autofield_sequence_name(model._meta.db_table)})
|
||||
|
||||
def alter_field(self, model, old_field, new_field, strict=False):
|
||||
try:
|
||||
|
@ -56,6 +63,16 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||
# SQLite-ish workaround
|
||||
if 'ORA-22858' in description or 'ORA-22859' in description:
|
||||
self._alter_field_type_workaround(model, old_field, new_field)
|
||||
# If an identity column is changing to a non-numeric type, drop the
|
||||
# identity first.
|
||||
elif 'ORA-30675' in description:
|
||||
self._drop_identity(model._meta.db_table, old_field.column)
|
||||
self.alter_field(model, old_field, new_field, strict)
|
||||
# If a primary key column is changing to an identity column, drop
|
||||
# the primary key first.
|
||||
elif 'ORA-30673' in description and old_field.primary_key:
|
||||
self._delete_primary_key(model, strict=True)
|
||||
self._alter_field_type_workaround(model, old_field, new_field)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
@ -63,7 +80,9 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||
"""
|
||||
Oracle refuses to change from some type to other type.
|
||||
What we need to do instead is:
|
||||
- Add a nullable version of the desired field with a temporary name
|
||||
- Add a nullable version of the desired field with a temporary name. If
|
||||
the new column is an auto field, then the temporary column can't be
|
||||
nullable.
|
||||
- Update the table to transfer values from old to new
|
||||
- Drop old column
|
||||
- Rename the new column and possibly drop the nullable property
|
||||
|
@ -71,7 +90,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||
# Make a new field that's like the new one but with a temporary
|
||||
# column name.
|
||||
new_temp_field = copy.deepcopy(new_field)
|
||||
new_temp_field.null = True
|
||||
new_temp_field.null = (new_field.get_internal_type() not in ('AutoField', 'BigAutoField'))
|
||||
new_temp_field.column = self._generate_temp_name(new_field.column)
|
||||
# Add it
|
||||
self.add_field(model, new_temp_field)
|
||||
|
@ -126,3 +145,21 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||
if db_type is not None and db_type.lower() in self.connection._limited_data_types:
|
||||
return False
|
||||
return create_index
|
||||
|
||||
def _is_identity_column(self, table_name, column_name):
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
CASE WHEN identity_column = 'YES' THEN 1 ELSE 0 END
|
||||
FROM user_tab_cols
|
||||
WHERE table_name = %s AND
|
||||
column_name = %s
|
||||
""", [self.normalize_name(table_name), self.normalize_name(column_name)])
|
||||
row = cursor.fetchone()
|
||||
return row[0] if row else False
|
||||
|
||||
def _drop_identity(self, table_name, column_name):
|
||||
self.execute('ALTER TABLE %(table)s MODIFY %(column)s DROP IDENTITY' % {
|
||||
'table': self.quote_name(table_name),
|
||||
'column': self.quote_name(column_name),
|
||||
})
|
||||
|
|
|
@ -206,6 +206,11 @@ Models
|
|||
* The new :class:`~django.db.models.functions.StrIndex` database function
|
||||
finds the starting index of a string inside another string.
|
||||
|
||||
* On Oracle, ``AutoField`` and ``BigAutoField`` are now created as `identity
|
||||
columns`_.
|
||||
|
||||
.. _`identity columns`: https://docs.oracle.com/database/121/DRDAA/migr_tools_feat.htm#DRDAA109
|
||||
|
||||
Requests and Responses
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
|
|
@ -10,9 +10,9 @@ from django.db import (
|
|||
from django.db.models import Model
|
||||
from django.db.models.deletion import CASCADE, PROTECT
|
||||
from django.db.models.fields import (
|
||||
AutoField, BigIntegerField, BinaryField, BooleanField, CharField,
|
||||
DateField, DateTimeField, IntegerField, PositiveIntegerField, SlugField,
|
||||
TextField, TimeField,
|
||||
AutoField, BigAutoField, BigIntegerField, BinaryField, BooleanField,
|
||||
CharField, DateField, DateTimeField, IntegerField, PositiveIntegerField,
|
||||
SlugField, TextField, TimeField,
|
||||
)
|
||||
from django.db.models.fields.related import (
|
||||
ForeignKey, ForeignObject, ManyToManyField, OneToOneField,
|
||||
|
@ -532,6 +532,30 @@ class SchemaTests(TransactionTestCase):
|
|||
self.assertEqual(columns['name'][0], "TextField")
|
||||
self.assertEqual(bool(columns['name'][1][6]), bool(connection.features.interprets_empty_strings_as_nulls))
|
||||
|
||||
def test_alter_auto_field_to_integer_field(self):
|
||||
# Create the table
|
||||
with connection.schema_editor() as editor:
|
||||
editor.create_model(Author)
|
||||
# Change AutoField to IntegerField
|
||||
old_field = Author._meta.get_field('id')
|
||||
new_field = IntegerField(primary_key=True)
|
||||
new_field.set_attributes_from_name('id')
|
||||
new_field.model = Author
|
||||
with connection.schema_editor() as editor:
|
||||
editor.alter_field(Author, old_field, new_field, strict=True)
|
||||
|
||||
def test_alter_auto_field_to_char_field(self):
|
||||
# Create the table
|
||||
with connection.schema_editor() as editor:
|
||||
editor.create_model(Author)
|
||||
# Change AutoField to CharField
|
||||
old_field = Author._meta.get_field('id')
|
||||
new_field = CharField(primary_key=True, max_length=50)
|
||||
new_field.set_attributes_from_name('id')
|
||||
new_field.model = Author
|
||||
with connection.schema_editor() as editor:
|
||||
editor.alter_field(Author, old_field, new_field, strict=True)
|
||||
|
||||
def test_alter_text_field(self):
|
||||
# Regression for "BLOB/TEXT column 'info' can't have a default value")
|
||||
# on MySQL.
|
||||
|
@ -1003,6 +1027,22 @@ class SchemaTests(TransactionTestCase):
|
|||
with connection.schema_editor() as editor:
|
||||
editor.alter_field(IntegerPK, old_field, new_field, strict=True)
|
||||
|
||||
def test_alter_int_pk_to_bigautofield_pk(self):
|
||||
"""
|
||||
Should be able to rename an IntegerField(primary_key=True) to
|
||||
BigAutoField(primary_key=True).
|
||||
"""
|
||||
with connection.schema_editor() as editor:
|
||||
editor.create_model(IntegerPK)
|
||||
|
||||
old_field = IntegerPK._meta.get_field('i')
|
||||
new_field = BigAutoField(primary_key=True)
|
||||
new_field.model = IntegerPK
|
||||
new_field.set_attributes_from_name('i')
|
||||
|
||||
with connection.schema_editor() as editor:
|
||||
editor.alter_field(IntegerPK, old_field, new_field, strict=True)
|
||||
|
||||
def test_alter_int_pk_to_int_unique(self):
|
||||
"""
|
||||
Should be able to rename an IntegerField(primary_key=True) to
|
||||
|
|
Loading…
Reference in New Issue