Refs #29928 -- Implemented fast constraint checking on SQLite 3.20+.

This commit is contained in:
Simon Charette 2018-12-11 02:44:42 -05:00 committed by Carlton Gibson
parent ae2897aaf8
commit a939d630a4
1 changed files with 59 additions and 27 deletions

View File

@ -8,6 +8,7 @@ import math
import operator
import re
import warnings
from itertools import chain
from sqlite3 import dbapi2 as Database
import pytz
@ -266,37 +267,68 @@ class DatabaseWrapper(BaseDatabaseWrapper):
determine if rows with invalid references were entered while constraint
checks were off.
"""
with self.cursor() as cursor:
if table_names is None:
table_names = self.introspection.table_names(cursor)
for table_name in table_names:
primary_key_column_name = self.introspection.get_primary_key_column(cursor, table_name)
if not primary_key_column_name:
continue
key_columns = self.introspection.get_key_columns(cursor, table_name)
for column_name, referenced_table_name, referenced_column_name in key_columns:
cursor.execute(
"""
SELECT REFERRING.`%s`, REFERRING.`%s` FROM `%s` as REFERRING
LEFT JOIN `%s` as REFERRED
ON (REFERRING.`%s` = REFERRED.`%s`)
WHERE REFERRING.`%s` IS NOT NULL AND REFERRED.`%s` IS NULL
"""
% (
primary_key_column_name, column_name, table_name,
referenced_table_name, column_name, referenced_column_name,
column_name, referenced_column_name,
if Database.sqlite_version_info >= (3, 20, 0):
with self.cursor() as cursor:
if table_names is None:
violations = self.cursor().execute('PRAGMA foreign_key_check').fetchall()
else:
violations = chain.from_iterable(
cursor.execute('PRAGMA foreign_key_check(%s)' % table_name).fetchall()
for table_name in table_names
)
# See https://www.sqlite.org/pragma.html#pragma_foreign_key_check
for table_name, rowid, referenced_table_name, foreign_key_index in violations:
foreign_key = cursor.execute(
'PRAGMA foreign_key_list(%s)' % table_name
).fetchall()[foreign_key_index]
column_name, referenced_column_name = foreign_key[3:5]
primary_key_column_name = self.introspection.get_primary_key_column(cursor, table_name)
primary_key_value, bad_value = cursor.execute(
'SELECT %s, %s FROM %s WHERE rowid = %%s' % (
primary_key_column_name, column_name, table_name
),
(rowid,),
).fetchone()
raise utils.IntegrityError(
"The row in table '%s' with primary key '%s' has an "
"invalid foreign key: %s.%s contains a value '%s' that "
"does not have a corresponding value in %s.%s." % (
table_name, primary_key_value, table_name, column_name,
bad_value, referenced_table_name, referenced_column_name
)
)
for bad_row in cursor.fetchall():
raise utils.IntegrityError(
"The row in table '%s' with primary key '%s' has an "
"invalid foreign key: %s.%s contains a value '%s' that "
"does not have a corresponding value in %s.%s." % (
table_name, bad_row[0], table_name, column_name,
bad_row[1], referenced_table_name, referenced_column_name,
else:
with self.cursor() as cursor:
if table_names is None:
table_names = self.introspection.table_names(cursor)
for table_name in table_names:
primary_key_column_name = self.introspection.get_primary_key_column(cursor, table_name)
if not primary_key_column_name:
continue
key_columns = self.introspection.get_key_columns(cursor, table_name)
for column_name, referenced_table_name, referenced_column_name in key_columns:
cursor.execute(
"""
SELECT REFERRING.`%s`, REFERRING.`%s` FROM `%s` as REFERRING
LEFT JOIN `%s` as REFERRED
ON (REFERRING.`%s` = REFERRED.`%s`)
WHERE REFERRING.`%s` IS NOT NULL AND REFERRED.`%s` IS NULL
"""
% (
primary_key_column_name, column_name, table_name,
referenced_table_name, column_name, referenced_column_name,
column_name, referenced_column_name,
)
)
for bad_row in cursor.fetchall():
raise utils.IntegrityError(
"The row in table '%s' with primary key '%s' has an "
"invalid foreign key: %s.%s contains a value '%s' that "
"does not have a corresponding value in %s.%s." % (
table_name, bad_row[0], table_name, column_name,
bad_row[1], referenced_table_name, referenced_column_name,
)
)
def is_usable(self):
return True