[2.2.x] Refs #30183 -- Moved SQLite table constraint parsing to a method.

Backport of 4492be348a from master.
This commit is contained in:
Paveł Tyślacki 2019-02-28 00:44:45 +03:00 committed by Tim Graham
parent aaf45d5422
commit d8252025bc
1 changed files with 47 additions and 41 deletions

View File

@ -234,6 +234,52 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
}
return constraints
def _parse_table_constraints(self, sql):
# Check constraint parsing is based of SQLite syntax diagram.
# https://www.sqlite.org/syntaxdiagrams.html#table-constraint
def next_ttype(ttype):
for token in tokens:
if token.ttype == ttype:
return token
statement = sqlparse.parse(sql)[0]
constraints = {}
tokens = statement.flatten()
for token in tokens:
name = None
if token.match(sqlparse.tokens.Keyword, 'CONSTRAINT'):
# Table constraint
name_token = next_ttype(sqlparse.tokens.Literal.String.Symbol)
name = name_token.value[1:-1]
token = next_ttype(sqlparse.tokens.Keyword)
if token.match(sqlparse.tokens.Keyword, 'UNIQUE'):
constraints[name] = {
'unique': True,
'columns': [],
'primary_key': False,
'foreign_key': None,
'check': False,
'index': False,
}
if token.match(sqlparse.tokens.Keyword, 'CHECK'):
# Column check constraint
if name is None:
column_token = next_ttype(sqlparse.tokens.Literal.String.Symbol)
column = column_token.value[1:-1]
name = '__check__%s' % column
columns = [column]
else:
columns = []
constraints[name] = {
'check': True,
'columns': columns,
'primary_key': False,
'unique': False,
'foreign_key': None,
'index': False,
}
return constraints
def get_constraints(self, cursor, table_name):
"""
Retrieve any constraints or keys (unique, pk, fk, check, index) across
@ -251,48 +297,8 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
# table_name is a view.
pass
else:
# Check constraint parsing is based of SQLite syntax diagram.
# https://www.sqlite.org/syntaxdiagrams.html#table-constraint
def next_ttype(ttype):
for token in tokens:
if token.ttype == ttype:
return token
constraints.update(self._parse_table_constraints(table_schema))
statement = sqlparse.parse(table_schema)[0]
tokens = statement.flatten()
for token in tokens:
name = None
if token.match(sqlparse.tokens.Keyword, 'CONSTRAINT'):
# Table constraint
name_token = next_ttype(sqlparse.tokens.Literal.String.Symbol)
name = name_token.value[1:-1]
token = next_ttype(sqlparse.tokens.Keyword)
if token.match(sqlparse.tokens.Keyword, 'UNIQUE'):
constraints[name] = {
'unique': True,
'columns': [],
'primary_key': False,
'foreign_key': None,
'check': False,
'index': False,
}
if token.match(sqlparse.tokens.Keyword, 'CHECK'):
# Column check constraint
if name is None:
column_token = next_ttype(sqlparse.tokens.Literal.String.Symbol)
column = column_token.value[1:-1]
name = '__check__%s' % column
columns = [column]
else:
columns = []
constraints[name] = {
'check': True,
'columns': columns,
'primary_key': False,
'unique': False,
'foreign_key': None,
'index': False,
}
# Get the index info
cursor.execute("PRAGMA index_list(%s)" % self.connection.ops.quote_name(table_name))
for row in cursor.fetchall():