diff --git a/django/db/backends/sqlite3/introspection.py b/django/db/backends/sqlite3/introspection.py index f109b5d5ee..ce097dbd72 100644 --- a/django/db/backends/sqlite3/introspection.py +++ b/django/db/backends/sqlite3/introspection.py @@ -201,25 +201,12 @@ class DatabaseIntrospection(BaseDatabaseIntrospection): def get_primary_key_column(self, cursor, table_name): """Return the column name of the primary key for the given table.""" - # Don't use PRAGMA because that causes issues with some transactions cursor.execute( - "SELECT sql, type FROM sqlite_master " - "WHERE tbl_name = %s AND type IN ('table', 'view')", - [table_name] + 'PRAGMA table_info(%s)' % self.connection.ops.quote_name(table_name) ) - row = cursor.fetchone() - if row is None: - raise ValueError("Table %s does not exist" % table_name) - create_sql, table_type = row - if table_type == 'view': - # Views don't have a primary key. - return None - fields_sql = create_sql[create_sql.index('(') + 1:create_sql.rindex(')')] - for field_desc in fields_sql.split(','): - field_desc = field_desc.strip() - m = re.match(r'(?:(?:["`\[])(.*)(?:["`\]])|(\w+)).*PRIMARY KEY.*', field_desc) - if m: - return m[1] if m[1] else m[2] + for _, name, *_, pk in cursor.fetchall(): + if pk: + return name return None def _get_foreign_key_constraints(self, cursor, table_name): diff --git a/tests/backends/sqlite/test_introspection.py b/tests/backends/sqlite/test_introspection.py index e378e0ee56..9331b5bb1a 100644 --- a/tests/backends/sqlite/test_introspection.py +++ b/tests/backends/sqlite/test_introspection.py @@ -28,6 +28,25 @@ class IntrospectionTests(TestCase): finally: cursor.execute('DROP TABLE test_primary') + def test_get_primary_key_column_pk_constraint(self): + sql = """ + CREATE TABLE test_primary( + id INTEGER NOT NULL, + created DATE, + PRIMARY KEY(id) + ) + """ + with connection.cursor() as cursor: + try: + cursor.execute(sql) + field = connection.introspection.get_primary_key_column( + cursor, + 'test_primary', + ) + self.assertEqual(field, 'id') + finally: + cursor.execute('DROP TABLE test_primary') + @unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests') class ParsingTests(TestCase):