Made get_indexes() consistent across backends.

Fixed #15933, #18082 -- the get_indexes() method introspection was
done inconsitently depending on the backend. For example SQLite
included all the columns in the table in the returned dictionary,
while MySQL introspected also multicolumn indexes.

All backends return now consistenly only single-column indexes.

Thanks to andi for the MySQL report, and ikelly for comments on
Oracle's get_indexes() changes.
This commit is contained in:
Anssi Kääriäinen 2012-04-30 14:05:30 +03:00
parent eba4197c71
commit a18e43c5bb
7 changed files with 66 additions and 56 deletions

View File

@ -997,6 +997,17 @@ class BaseDatabaseIntrospection(object):
"""
raise NotImplementedError
def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of indexed fieldname -> infodict for the given
table, where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
Only single-column indexes are introspected.
"""
raise NotImplementedError
class BaseDatabaseClient(object):
"""
This class encapsulates all backend-specific methods for opening a

View File

@ -85,15 +85,19 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
return None
def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
cursor.execute("SHOW INDEX FROM %s" % self.connection.ops.quote_name(table_name))
# Do a two-pass search for indexes: on first pass check which indexes
# are multicolumn, on second pass check which single-column indexes
# are present.
rows = list(cursor.fetchall())
multicol_indexes = set()
for row in rows:
if row[3] > 1:
multicol_indexes.add(row[2])
indexes = {}
for row in cursor.fetchall():
for row in rows:
if row[2] in multicol_indexes:
continue
indexes[row[4]] = {'primary_key': (row[2] == 'PRIMARY'), 'unique': not bool(row[1])}
return indexes

View File

@ -72,14 +72,14 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
FROM user_constraints, USER_CONS_COLUMNS ca, USER_CONS_COLUMNS cb,
user_tab_cols ta, user_tab_cols tb
WHERE user_constraints.table_name = %s AND
ta.table_name = %s AND
ta.table_name = user_constraints.table_name AND
ta.column_name = ca.column_name AND
ca.table_name = %s AND
ca.table_name = ta.table_name AND
user_constraints.constraint_name = ca.constraint_name AND
user_constraints.r_constraint_name = cb.constraint_name AND
cb.table_name = tb.table_name AND
cb.column_name = tb.column_name AND
ca.position = cb.position""", [table_name, table_name, table_name])
ca.position = cb.position""", [table_name])
relations = {}
for row in cursor.fetchall():
@ -87,36 +87,31 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
return relations
def get_indexes(self, cursor, table_name):
sql = """
SELECT LOWER(uic1.column_name) AS column_name,
CASE user_constraints.constraint_type
WHEN 'P' THEN 1 ELSE 0
END AS is_primary_key,
CASE user_indexes.uniqueness
WHEN 'UNIQUE' THEN 1 ELSE 0
END AS is_unique
FROM user_constraints, user_indexes, user_ind_columns uic1
WHERE user_constraints.constraint_type (+) = 'P'
AND user_constraints.index_name (+) = uic1.index_name
AND user_indexes.uniqueness (+) = 'UNIQUE'
AND user_indexes.index_name (+) = uic1.index_name
AND uic1.table_name = UPPER(%s)
AND uic1.column_position = 1
AND NOT EXISTS (
SELECT 1
FROM user_ind_columns uic2
WHERE uic2.index_name = uic1.index_name
AND uic2.column_position = 2
)
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
# This query retrieves each index on the given table, including the
# first associated field name
# "We were in the nick of time; you were in great peril!"
sql = """\
SELECT LOWER(all_tab_cols.column_name) AS column_name,
CASE user_constraints.constraint_type
WHEN 'P' THEN 1 ELSE 0
END AS is_primary_key,
CASE user_indexes.uniqueness
WHEN 'UNIQUE' THEN 1 ELSE 0
END AS is_unique
FROM all_tab_cols, user_cons_columns, user_constraints, user_ind_columns, user_indexes
WHERE all_tab_cols.column_name = user_cons_columns.column_name (+)
AND all_tab_cols.table_name = user_cons_columns.table_name (+)
AND user_cons_columns.constraint_name = user_constraints.constraint_name (+)
AND user_constraints.constraint_type (+) = 'P'
AND user_ind_columns.column_name (+) = all_tab_cols.column_name
AND user_ind_columns.table_name (+) = all_tab_cols.table_name
AND user_indexes.uniqueness (+) = 'UNIQUE'
AND user_indexes.index_name (+) = user_ind_columns.index_name
AND all_tab_cols.table_name = UPPER(%s)
"""
cursor.execute(sql, [table_name])
indexes = {}
for row in cursor.fetchall():
indexes[row[0]] = {'primary_key': row[1], 'unique': row[2]}
indexes[row[0]] = {'primary_key': bool(row[1]),
'unique': bool(row[2])}
return indexes

View File

@ -65,12 +65,6 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
return relations
def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
# This query retrieves each index on the given table, including the
# first associated field name
cursor.execute("""

View File

@ -133,28 +133,22 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
return key_columns
def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
indexes = {}
for info in self._table_info(cursor, table_name):
indexes[info['name']] = {'primary_key': info['pk'] != 0,
'unique': False}
if info['pk'] != 0:
indexes[info['name']] = {'primary_key': True,
'unique': False}
cursor.execute('PRAGMA index_list(%s)' % self.connection.ops.quote_name(table_name))
# seq, name, unique
for index, unique in [(field[1], field[2]) for field in cursor.fetchall()]:
if not unique:
continue
cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index))
info = cursor.fetchall()
# Skip indexes across multiple fields
if len(info) != 1:
continue
name = info[0][2] # seqno, cid, name
indexes[name]['unique'] = True
indexes[name] = {'primary_key': False,
'unique': unique}
return indexes
def get_primary_key_column(self, cursor, table_name):

View File

@ -7,6 +7,9 @@ class Reporter(models.Model):
email = models.EmailField()
facebook_user_id = models.BigIntegerField(null=True)
class Meta:
unique_together = ('first_name', 'last_name')
def __unicode__(self):
return u"%s %s" % (self.first_name, self.last_name)
@ -19,4 +22,4 @@ class Article(models.Model):
return self.headline
class Meta:
ordering = ('headline',)
ordering = ('headline',)

View File

@ -137,6 +137,15 @@ class IntrospectionTests(TestCase):
indexes = connection.introspection.get_indexes(cursor, Article._meta.db_table)
self.assertEqual(indexes['reporter_id'], {'unique': False, 'primary_key': False})
def test_get_indexes_multicol(self):
"""
Test that multicolumn indexes are not included in the introspection
results.
"""
cursor = connection.cursor()
indexes = connection.introspection.get_indexes(cursor, Reporter._meta.db_table)
self.assertNotIn('first_name', indexes)
self.assertIn('id', indexes)
def datatype(dbtype, description):
"""Helper to convert a data type into a string."""