Fixed #32234 -- Made inspectdb inform about composite primary keys.

This commit is contained in:
Anv3sh 2022-05-12 01:00:13 +05:30 committed by Mariusz Felisiak
parent ebf25555bb
commit 295249c901
10 changed files with 88 additions and 17 deletions

View File

@ -106,11 +106,14 @@ class Command(BaseCommand):
)
except NotImplementedError:
constraints = {}
primary_key_column = (
connection.introspection.get_primary_key_column(
primary_key_columns = (
connection.introspection.get_primary_key_columns(
cursor, table_name
)
)
primary_key_column = (
primary_key_columns[0] if primary_key_columns else None
)
unique_columns = [
c["columns"][0]
for c in constraints.values()
@ -150,6 +153,12 @@ class Command(BaseCommand):
# Add primary_key and unique, if necessary.
if column_name == primary_key_column:
extra_params["primary_key"] = True
if len(primary_key_columns) > 1:
comment_notes.append(
"The composite primary key (%s) found, that is not "
"supported. The first column is selected."
% ", ".join(primary_key_columns)
)
elif column_name in unique_columns:
extra_params["unique"] = True

View File

@ -260,6 +260,10 @@ class BaseDatabaseFeatures:
create_test_procedure_without_params_sql = None
create_test_procedure_with_int_param_sql = None
# SQL to create a table with a composite primary key for use by the Django
# test suite.
create_test_table_with_composite_primary_key = None
# Does the backend support keyword parameters for cursor.callproc()?
supports_callproc_kwargs = False

View File

@ -177,9 +177,14 @@ class BaseDatabaseIntrospection:
"""
Return the name of the primary key column for the given table.
"""
columns = self.get_primary_key_columns(cursor, table_name)
return columns[0] if columns else None
def get_primary_key_columns(self, cursor, table_name):
"""Return a list of primary key columns for the given table."""
for constraint in self.get_constraints(cursor, table_name).values():
if constraint["primary_key"]:
return constraint["columns"][0]
return constraint["columns"]
return None
def get_constraints(self, cursor, table_name):

View File

@ -39,6 +39,13 @@ class DatabaseFeatures(BaseDatabaseFeatures):
SET V_I = P_I;
END;
"""
create_test_table_with_composite_primary_key = """
CREATE TABLE test_table_composite_pk (
column_1 INTEGER NOT NULL,
column_2 INTEGER NOT NULL,
PRIMARY KEY(column_1, column_2)
)
"""
# Neither MySQL nor MariaDB support partial indexes.
supports_partial_indexes = False
# COLLATE must be wrapped in parentheses because MySQL treats COLLATE as an

View File

@ -54,6 +54,13 @@ class DatabaseFeatures(BaseDatabaseFeatures):
V_I := P_I;
END;
"""
create_test_table_with_composite_primary_key = """
CREATE TABLE test_table_composite_pk (
column_1 NUMBER(11) NOT NULL,
column_2 NUMBER(11) NOT NULL,
PRIMARY KEY (column_1, column_2)
)
"""
supports_callproc_kwargs = True
supports_over_clause = True
supports_frame_range_fixed_distance = True

View File

@ -248,7 +248,7 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
for field_name, rel_table_name, rel_field_name in cursor.fetchall()
}
def get_primary_key_column(self, cursor, table_name):
def get_primary_key_columns(self, cursor, table_name):
cursor.execute(
"""
SELECT
@ -259,13 +259,13 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
WHERE
user_constraints.constraint_name = cols.constraint_name AND
user_constraints.constraint_type = 'P' AND
user_constraints.table_name = UPPER(%s) AND
cols.position = 1
user_constraints.table_name = UPPER(%s)
ORDER BY
cols.position
""",
[table_name],
)
row = cursor.fetchone()
return self.identifier_converter(row[0]) if row else None
return [self.identifier_converter(row[0]) for row in cursor.fetchall()]
def get_constraints(self, cursor, table_name):
"""

View File

@ -49,6 +49,13 @@ class DatabaseFeatures(BaseDatabaseFeatures):
V_I := P_I;
END;
$$ LANGUAGE plpgsql;"""
create_test_table_with_composite_primary_key = """
CREATE TABLE test_table_composite_pk (
column_1 INTEGER NOT NULL,
column_2 INTEGER NOT NULL,
PRIMARY KEY(column_1, column_2)
)
"""
requires_casted_case_in_updates = True
supports_over_clause = True
only_supports_unbounded_with_preceding_and_following = True

View File

@ -55,6 +55,13 @@ class DatabaseFeatures(BaseDatabaseFeatures):
# Date/DateTime fields and timedeltas.
"expressions.tests.FTimeDeltaTests.test_mixed_comparisons1",
}
create_test_table_with_composite_primary_key = """
CREATE TABLE test_table_composite_pk (
column_1 INTEGER NOT NULL,
column_2 INTEGER NOT NULL,
PRIMARY KEY(column_1, column_2)
)
"""
@cached_property
def django_test_skips(self):

View File

@ -156,15 +156,11 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
) in cursor.fetchall()
}
def get_primary_key_column(self, cursor, table_name):
"""Return the column name of the primary key for the given table."""
def get_primary_key_columns(self, cursor, table_name):
cursor.execute(
"PRAGMA table_info(%s)" % self.connection.ops.quote_name(table_name)
)
for _, name, *_, pk in cursor.fetchall():
if pk:
return name
return None
return [name for _, name, *_, pk in cursor.fetchall() if pk]
def _parse_column_or_constraint_definition(self, tokens, columns):
token = None
@ -372,14 +368,14 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
if orders is not None:
constraints[index]["orders"] = orders
# Get the PK
pk_column = self.get_primary_key_column(cursor, table_name)
if pk_column:
pk_columns = self.get_primary_key_columns(cursor, table_name)
if pk_columns:
# SQLite doesn't actually give a name to the PK constraint,
# so we invent one. This is fine, as the SQLite backend never
# deletes PK constraints by name, as you can't delete constraints
# in SQLite; we remake the table with a new PK instead.
constraints["__primary__"] = {
"columns": [pk_column],
"columns": pk_columns,
"primary_key": True,
"unique": False, # It's not actually a unique constraint.
"foreign_key": None,

View File

@ -585,3 +585,32 @@ class InspectDBTransactionalTests(TransactionTestCase):
)
cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
@skipUnlessDBFeature("create_test_table_with_composite_primary_key")
def test_composite_primary_key(self):
table_name = "test_table_composite_pk"
with connection.cursor() as cursor:
cursor.execute(
connection.features.create_test_table_with_composite_primary_key
)
out = StringIO()
if connection.vendor == "sqlite":
field_type = connection.features.introspected_field_types["AutoField"]
else:
field_type = connection.features.introspected_field_types["IntegerField"]
try:
call_command("inspectdb", table_name, stdout=out)
output = out.getvalue()
self.assertIn(
f"column_1 = models.{field_type}(primary_key=True) # The composite "
f"primary key (column_1, column_2) found, that is not supported. The "
f"first column is selected.",
output,
)
self.assertIn(
"column_2 = models.IntegerField()",
output,
)
finally:
with connection.cursor() as cursor:
cursor.execute("DROP TABLE %s" % table_name)