Removed obsolete SQL generation methods.

This commit is contained in:
Tim Graham 2014-12-30 11:39:30 -05:00
parent 7e8cf74dc7
commit 2b039d966f
12 changed files with 2 additions and 573 deletions

View File

@ -1,6 +1,5 @@
from django.db.backends.mysql.base import DatabaseWrapper as MySQLDatabaseWrapper from django.db.backends.mysql.base import DatabaseWrapper as MySQLDatabaseWrapper
from .creation import MySQLCreation
from .features import DatabaseFeatures from .features import DatabaseFeatures
from .introspection import MySQLIntrospection from .introspection import MySQLIntrospection
from .operations import MySQLOperations from .operations import MySQLOperations
@ -13,6 +12,5 @@ class DatabaseWrapper(MySQLDatabaseWrapper):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(DatabaseWrapper, self).__init__(*args, **kwargs) super(DatabaseWrapper, self).__init__(*args, **kwargs)
self.features = DatabaseFeatures(self) self.features = DatabaseFeatures(self)
self.creation = MySQLCreation(self)
self.ops = MySQLOperations(self) self.ops = MySQLOperations(self)
self.introspection = MySQLIntrospection(self) self.introspection = MySQLIntrospection(self)

View File

@ -1,18 +0,0 @@
from django.db.backends.mysql.creation import DatabaseCreation
class MySQLCreation(DatabaseCreation):
def sql_indexes_for_field(self, model, f, style):
from django.contrib.gis.db.models.fields import GeometryField
output = super(MySQLCreation, self).sql_indexes_for_field(model, f, style)
if isinstance(f, GeometryField) and f.spatial_index:
qn = self.connection.ops.quote_name
db_table = model._meta.db_table
idx_name = '%s_%s_id' % (db_table, f.column)
output.append(style.SQL_KEYWORD('CREATE SPATIAL INDEX ') +
style.SQL_TABLE(qn(idx_name)) +
style.SQL_KEYWORD(' ON ') +
style.SQL_TABLE(qn(db_table)) + '(' +
style.SQL_FIELD(qn(f.column)) + ');')
return output

View File

@ -1,6 +1,5 @@
from django.db.backends.oracle.base import DatabaseWrapper as OracleDatabaseWrapper from django.db.backends.oracle.base import DatabaseWrapper as OracleDatabaseWrapper
from .creation import OracleCreation
from .features import DatabaseFeatures from .features import DatabaseFeatures
from .introspection import OracleIntrospection from .introspection import OracleIntrospection
from .operations import OracleOperations from .operations import OracleOperations
@ -14,5 +13,4 @@ class DatabaseWrapper(OracleDatabaseWrapper):
super(DatabaseWrapper, self).__init__(*args, **kwargs) super(DatabaseWrapper, self).__init__(*args, **kwargs)
self.features = DatabaseFeatures(self) self.features = DatabaseFeatures(self)
self.ops = OracleOperations(self) self.ops = OracleOperations(self)
self.creation = OracleCreation(self)
self.introspection = OracleIntrospection(self) self.introspection = OracleIntrospection(self)

View File

@ -1,43 +0,0 @@
from django.db.backends.oracle.creation import DatabaseCreation
from django.db.backends.utils import truncate_name
class OracleCreation(DatabaseCreation):
def sql_indexes_for_field(self, model, f, style):
"Return any spatial index creation SQL for the field."
from django.contrib.gis.db.models.fields import GeometryField
output = super(OracleCreation, self).sql_indexes_for_field(model, f, style)
if isinstance(f, GeometryField):
gqn = self.connection.ops.geo_quote_name
qn = self.connection.ops.quote_name
db_table = model._meta.db_table
output.append(style.SQL_KEYWORD('INSERT INTO ') +
style.SQL_TABLE('USER_SDO_GEOM_METADATA') +
' (%s, %s, %s, %s)\n ' % tuple(map(qn, ['TABLE_NAME', 'COLUMN_NAME', 'DIMINFO', 'SRID'])) +
style.SQL_KEYWORD(' VALUES ') + '(\n ' +
style.SQL_TABLE(gqn(db_table)) + ',\n ' +
style.SQL_FIELD(gqn(f.column)) + ',\n ' +
style.SQL_KEYWORD("MDSYS.SDO_DIM_ARRAY") + '(\n ' +
style.SQL_KEYWORD("MDSYS.SDO_DIM_ELEMENT") +
("('LONG', %s, %s, %s),\n " % (f._extent[0], f._extent[2], f._tolerance)) +
style.SQL_KEYWORD("MDSYS.SDO_DIM_ELEMENT") +
("('LAT', %s, %s, %s)\n ),\n" % (f._extent[1], f._extent[3], f._tolerance)) +
' %s\n );' % f.srid)
if f.spatial_index:
# Getting the index name, Oracle doesn't allow object
# names > 30 characters.
idx_name = truncate_name('%s_%s_id' % (db_table, f.column), 30)
output.append(style.SQL_KEYWORD('CREATE INDEX ') +
style.SQL_TABLE(qn(idx_name)) +
style.SQL_KEYWORD(' ON ') +
style.SQL_TABLE(qn(db_table)) + '(' +
style.SQL_FIELD(qn(f.column)) + ') ' +
style.SQL_KEYWORD('INDEXTYPE IS ') +
style.SQL_TABLE('MDSYS.SPATIAL_INDEX') + ';')
return output

View File

@ -2,68 +2,6 @@ from django.db.backends.postgresql_psycopg2.creation import DatabaseCreation
class PostGISCreation(DatabaseCreation): class PostGISCreation(DatabaseCreation):
geom_index_type = 'GIST'
geom_index_ops = 'GIST_GEOMETRY_OPS'
geom_index_ops_nd = 'GIST_GEOMETRY_OPS_ND'
def sql_indexes_for_field(self, model, f, style):
"Return any spatial index creation SQL for the field."
from django.contrib.gis.db.models.fields import GeometryField
output = super(PostGISCreation, self).sql_indexes_for_field(model, f, style)
if isinstance(f, GeometryField):
gqn = self.connection.ops.geo_quote_name
qn = self.connection.ops.quote_name
db_table = model._meta.db_table
if f.geography or self.connection.ops.geometry:
# Geography and Geometry (PostGIS 2.0+) columns are
# created normally.
pass
else:
# Geometry columns are created by `AddGeometryColumn`
# stored procedure.
output.append(style.SQL_KEYWORD('SELECT ') +
style.SQL_TABLE('AddGeometryColumn') + '(' +
style.SQL_TABLE(gqn(db_table)) + ', ' +
style.SQL_FIELD(gqn(f.column)) + ', ' +
style.SQL_FIELD(str(f.srid)) + ', ' +
style.SQL_COLTYPE(gqn(f.geom_type)) + ', ' +
style.SQL_KEYWORD(str(f.dim)) + ');')
if not f.null:
# Add a NOT NULL constraint to the field
output.append(style.SQL_KEYWORD('ALTER TABLE ') +
style.SQL_TABLE(qn(db_table)) +
style.SQL_KEYWORD(' ALTER ') +
style.SQL_FIELD(qn(f.column)) +
style.SQL_KEYWORD(' SET NOT NULL') + ';')
if f.spatial_index:
# Spatial indexes created the same way for both Geometry and
# Geography columns.
# PostGIS 2.0 does not support GIST_GEOMETRY_OPS. So, on 1.5
# we use GIST_GEOMETRY_OPS, on 2.0 we use either "nd" ops
# which are fast on multidimensional cases, or just plain
# gist index for the 2d case.
if f.geography:
index_ops = ''
elif self.connection.ops.geometry:
if f.dim > 2:
index_ops = ' ' + style.SQL_KEYWORD(self.geom_index_ops_nd)
else:
index_ops = ''
else:
index_ops = ' ' + style.SQL_KEYWORD(self.geom_index_ops)
output.append(style.SQL_KEYWORD('CREATE INDEX ') +
style.SQL_TABLE(qn('%s_%s_id' % (db_table, f.column))) +
style.SQL_KEYWORD(' ON ') +
style.SQL_TABLE(qn(db_table)) +
style.SQL_KEYWORD(' USING ') +
style.SQL_COLTYPE(self.geom_index_type) + ' ( ' +
style.SQL_FIELD(qn(f.column)) + index_ops + ' );')
return output
def sql_table_creation_suffix(self): def sql_table_creation_suffix(self):
if self.connection.template_postgis is not None: if self.connection.template_postgis is not None:

View File

@ -9,7 +9,6 @@ from django.db.backends.sqlite3.base import (
from django.utils import six from django.utils import six
from .client import SpatiaLiteClient from .client import SpatiaLiteClient
from .creation import SpatiaLiteCreation
from .features import DatabaseFeatures from .features import DatabaseFeatures
from .introspection import SpatiaLiteIntrospection from .introspection import SpatiaLiteIntrospection
from .operations import SpatiaLiteOperations from .operations import SpatiaLiteOperations
@ -41,7 +40,6 @@ class DatabaseWrapper(SQLiteDatabaseWrapper):
self.features = DatabaseFeatures(self) self.features = DatabaseFeatures(self)
self.ops = SpatiaLiteOperations(self) self.ops = SpatiaLiteOperations(self)
self.client = SpatiaLiteClient(self) self.client = SpatiaLiteClient(self)
self.creation = SpatiaLiteCreation(self)
self.introspection = SpatiaLiteIntrospection(self) self.introspection = SpatiaLiteIntrospection(self)
def get_new_connection(self, conn_params): def get_new_connection(self, conn_params):

View File

@ -1,32 +0,0 @@
from django.db.backends.sqlite3.creation import DatabaseCreation
class SpatiaLiteCreation(DatabaseCreation):
def sql_indexes_for_field(self, model, f, style):
"Return any spatial index creation SQL for the field."
from django.contrib.gis.db.models.fields import GeometryField
output = super(SpatiaLiteCreation, self).sql_indexes_for_field(model, f, style)
if isinstance(f, GeometryField):
gqn = self.connection.ops.geo_quote_name
db_table = model._meta.db_table
output.append(style.SQL_KEYWORD('SELECT ') +
style.SQL_TABLE('AddGeometryColumn') + '(' +
style.SQL_TABLE(gqn(db_table)) + ', ' +
style.SQL_FIELD(gqn(f.column)) + ', ' +
style.SQL_FIELD(str(f.srid)) + ', ' +
style.SQL_COLTYPE(gqn(f.geom_type)) + ', ' +
style.SQL_KEYWORD(str(f.dim)) + ', ' +
style.SQL_KEYWORD(str(int(not f.null))) +
');')
if f.spatial_index:
output.append(style.SQL_KEYWORD('SELECT ') +
style.SQL_TABLE('CreateSpatialIndex') + '(' +
style.SQL_TABLE(gqn(db_table)) + ', ' +
style.SQL_FIELD(gqn(f.column)) + ');')
return output

View File

@ -1,15 +1,10 @@
import hashlib
import sys import sys
import time import time
import warnings
from django.apps import apps from django.apps import apps
from django.conf import settings from django.conf import settings
from django.core import serializers from django.core import serializers
from django.db import router from django.db import router
from django.db.backends.utils import truncate_name
from django.utils.deprecation import RemovedInDjango20Warning
from django.utils.encoding import force_bytes
from django.utils.six import StringIO from django.utils.six import StringIO
from django.utils.six.moves import input from django.utils.six.moves import input
@ -22,9 +17,7 @@ TEST_DATABASE_PREFIX = 'test_'
class BaseDatabaseCreation(object): class BaseDatabaseCreation(object):
""" """
This class encapsulates all backend-specific differences that pertain to This class encapsulates all backend-specific differences that pertain to
database *creation*, such as the column types to use for particular Django creation and destruction of the test database.
Fields, the SQL used to create and destroy tables, and the creation and
destruction of test databases.
""" """
def __init__(self, connection): def __init__(self, connection):
self.connection = connection self.connection = connection
@ -36,296 +29,6 @@ class BaseDatabaseCreation(object):
""" """
return self.connection._nodb_connection return self.connection._nodb_connection
@classmethod
def _digest(cls, *args):
"""
Generates a 32-bit digest of a set of arguments that can be used to
shorten identifying names.
"""
h = hashlib.md5()
for arg in args:
h.update(force_bytes(arg))
return h.hexdigest()[:8]
def sql_create_model(self, model, style, known_models=set()):
"""
Returns the SQL required to create a single model, as a tuple of:
(list_of_sql, pending_references_dict)
"""
opts = model._meta
if not opts.managed or opts.proxy or opts.swapped:
return [], {}
final_output = []
table_output = []
pending_references = {}
qn = self.connection.ops.quote_name
for f in opts.local_fields:
db_params = f.db_parameters(connection=self.connection)
col_type = db_params['type']
if db_params['check']:
col_type = '%s CHECK (%s)' % (col_type, db_params['check'])
col_type_suffix = f.db_type_suffix(connection=self.connection)
tablespace = f.db_tablespace or opts.db_tablespace
if col_type is None:
# Skip ManyToManyFields, because they're not represented as
# database columns in this table.
continue
# Make the definition (e.g. 'foo VARCHAR(30)') for this field.
field_output = [style.SQL_FIELD(qn(f.column)),
style.SQL_COLTYPE(col_type)]
# Oracle treats the empty string ('') as null, so coerce the null
# option whenever '' is a possible value.
null = f.null
if (f.empty_strings_allowed and not f.primary_key and
self.connection.features.interprets_empty_strings_as_nulls):
null = True
if not null:
field_output.append(style.SQL_KEYWORD('NOT NULL'))
if f.primary_key:
field_output.append(style.SQL_KEYWORD('PRIMARY KEY'))
elif f.unique:
field_output.append(style.SQL_KEYWORD('UNIQUE'))
if tablespace and f.unique:
# We must specify the index tablespace inline, because we
# won't be generating a CREATE INDEX statement for this field.
tablespace_sql = self.connection.ops.tablespace_sql(
tablespace, inline=True)
if tablespace_sql:
field_output.append(tablespace_sql)
if f.rel and f.db_constraint:
ref_output, pending = self.sql_for_inline_foreign_key_references(
model, f, known_models, style)
if pending:
pending_references.setdefault(f.rel.to, []).append(
(model, f))
else:
field_output.extend(ref_output)
if col_type_suffix:
field_output.append(style.SQL_KEYWORD(col_type_suffix))
table_output.append(' '.join(field_output))
for field_constraints in opts.unique_together:
table_output.append(style.SQL_KEYWORD('UNIQUE') + ' (%s)' %
", ".join(
[style.SQL_FIELD(qn(opts.get_field(f).column))
for f in field_constraints]))
full_statement = [style.SQL_KEYWORD('CREATE TABLE') + ' ' +
style.SQL_TABLE(qn(opts.db_table)) + ' (']
for i, line in enumerate(table_output): # Combine and add commas.
full_statement.append(
' %s%s' % (line, ',' if i < len(table_output) - 1 else ''))
full_statement.append(')')
if opts.db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(
opts.db_tablespace)
if tablespace_sql:
full_statement.append(tablespace_sql)
full_statement.append(';')
final_output.append('\n'.join(full_statement))
if opts.has_auto_field:
# Add any extra SQL needed to support auto-incrementing primary
# keys.
auto_column = opts.auto_field.db_column or opts.auto_field.name
autoinc_sql = self.connection.ops.autoinc_sql(opts.db_table,
auto_column)
if autoinc_sql:
for stmt in autoinc_sql:
final_output.append(stmt)
return final_output, pending_references
def sql_for_inline_foreign_key_references(self, model, field, known_models, style):
"""
Return the SQL snippet defining the foreign key reference for a field.
"""
qn = self.connection.ops.quote_name
rel_to = field.rel.to
if rel_to in known_models or rel_to == model:
output = [style.SQL_KEYWORD('REFERENCES') + ' ' +
style.SQL_TABLE(qn(rel_to._meta.db_table)) + ' (' +
style.SQL_FIELD(qn(rel_to._meta.get_field(
field.rel.field_name).column)) + ')' +
self.connection.ops.deferrable_sql()
]
pending = False
else:
# We haven't yet created the table to which this field
# is related, so save it for later.
output = []
pending = True
return output, pending
def sql_for_pending_references(self, model, style, pending_references):
"""
Returns any ALTER TABLE statements to add constraints after the fact.
"""
opts = model._meta
if not opts.managed or opts.swapped:
return []
qn = self.connection.ops.quote_name
final_output = []
if model in pending_references:
for rel_class, f in pending_references[model]:
rel_opts = rel_class._meta
r_table = rel_opts.db_table
r_col = f.column
table = opts.db_table
col = opts.get_field(f.rel.field_name).column
# For MySQL, r_name must be unique in the first 64 characters.
# So we are careful with character usage here.
r_name = '%s_refs_%s_%s' % (
r_col, col, self._digest(r_table, table))
final_output.append(style.SQL_KEYWORD('ALTER TABLE') +
' %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' %
(qn(r_table), qn(truncate_name(
r_name, self.connection.ops.max_name_length())),
qn(r_col), qn(table), qn(col),
self.connection.ops.deferrable_sql()))
del pending_references[model]
return final_output
def sql_indexes_for_model(self, model, style):
"""
Returns the CREATE INDEX SQL statements for a single model.
"""
warnings.warn("DatabaseCreation.sql_indexes_for_model is deprecated, "
"use the equivalent method of the schema editor instead.",
RemovedInDjango20Warning)
if not model._meta.managed or model._meta.proxy or model._meta.swapped:
return []
output = []
for f in model._meta.local_fields:
output.extend(self.sql_indexes_for_field(model, f, style))
for fs in model._meta.index_together:
fields = [model._meta.get_field(f) for f in fs]
output.extend(self.sql_indexes_for_fields(model, fields, style))
return output
def sql_indexes_for_field(self, model, f, style):
"""
Return the CREATE INDEX SQL statements for a single model field.
"""
if f.db_index and not f.unique:
return self.sql_indexes_for_fields(model, [f], style)
else:
return []
def sql_indexes_for_fields(self, model, fields, style):
if len(fields) == 1 and fields[0].db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(fields[0].db_tablespace)
elif model._meta.db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace)
else:
tablespace_sql = ""
if tablespace_sql:
tablespace_sql = " " + tablespace_sql
field_names = []
qn = self.connection.ops.quote_name
for f in fields:
field_names.append(style.SQL_FIELD(qn(f.column)))
index_name = "%s_%s" % (model._meta.db_table, self._digest([f.name for f in fields]))
return [
style.SQL_KEYWORD("CREATE INDEX") + " " +
style.SQL_TABLE(qn(truncate_name(index_name, self.connection.ops.max_name_length()))) + " " +
style.SQL_KEYWORD("ON") + " " +
style.SQL_TABLE(qn(model._meta.db_table)) + " " +
"(%s)" % style.SQL_FIELD(", ".join(field_names)) +
"%s;" % tablespace_sql,
]
def sql_destroy_model(self, model, references_to_delete, style):
"""
Return the DROP TABLE and restraint dropping statements for a single
model.
"""
if not model._meta.managed or model._meta.proxy or model._meta.swapped:
return []
# Drop the table now
qn = self.connection.ops.quote_name
output = ['%s %s;' % (style.SQL_KEYWORD('DROP TABLE'),
style.SQL_TABLE(qn(model._meta.db_table)))]
if model in references_to_delete:
output.extend(self.sql_remove_table_constraints(
model, references_to_delete, style))
if model._meta.has_auto_field:
ds = self.connection.ops.drop_sequence_sql(model._meta.db_table)
if ds:
output.append(ds)
return output
def sql_remove_table_constraints(self, model, references_to_delete, style):
if not model._meta.managed or model._meta.proxy or model._meta.swapped:
return []
output = []
qn = self.connection.ops.quote_name
for rel_class, f in references_to_delete[model]:
table = rel_class._meta.db_table
col = f.column
r_table = model._meta.db_table
r_col = model._meta.get_field(f.rel.field_name).column
r_name = '%s_refs_%s_%s' % (
col, r_col, self._digest(table, r_table))
output.append('%s %s %s %s;' % (
style.SQL_KEYWORD('ALTER TABLE'),
style.SQL_TABLE(qn(table)),
style.SQL_KEYWORD(self.connection.ops.drop_foreignkey_sql()),
style.SQL_FIELD(qn(truncate_name(
r_name, self.connection.ops.max_name_length())))
))
del references_to_delete[model]
return output
def sql_destroy_indexes_for_model(self, model, style):
"""
Returns the DROP INDEX SQL statements for a single model.
"""
if not model._meta.managed or model._meta.proxy or model._meta.swapped:
return []
output = []
for f in model._meta.local_fields:
output.extend(self.sql_destroy_indexes_for_field(model, f, style))
for fs in model._meta.index_together:
fields = [model._meta.get_field(f) for f in fs]
output.extend(self.sql_destroy_indexes_for_fields(model, fields, style))
return output
def sql_destroy_indexes_for_field(self, model, f, style):
"""
Return the DROP INDEX SQL statements for a single model field.
"""
if f.db_index and not f.unique:
return self.sql_destroy_indexes_for_fields(model, [f], style)
else:
return []
def sql_destroy_indexes_for_fields(self, model, fields, style):
if len(fields) == 1 and fields[0].db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(fields[0].db_tablespace)
elif model._meta.db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace)
else:
tablespace_sql = ""
if tablespace_sql:
tablespace_sql = " " + tablespace_sql
field_names = []
qn = self.connection.ops.quote_name
for f in fields:
field_names.append(style.SQL_FIELD(qn(f.column)))
index_name = "%s_%s" % (model._meta.db_table, self._digest([f.name for f in fields]))
return [
style.SQL_KEYWORD("DROP INDEX") + " " +
style.SQL_TABLE(qn(truncate_name(index_name, self.connection.ops.max_name_length()))) + " " +
";",
]
def create_test_db(self, verbosity=1, autoclobber=False, serialize=True, keepdb=False): def create_test_db(self, verbosity=1, autoclobber=False, serialize=True, keepdb=False):
""" """
Creates a test database, prompting the user for confirmation if the Creates a test database, prompting the user for confirmation if the

View File

@ -11,33 +11,3 @@ class DatabaseCreation(BaseDatabaseCreation):
if test_settings['COLLATION']: if test_settings['COLLATION']:
suffix.append('COLLATE %s' % test_settings['COLLATION']) suffix.append('COLLATE %s' % test_settings['COLLATION'])
return ' '.join(suffix) return ' '.join(suffix)
def sql_for_inline_foreign_key_references(self, model, field, known_models, style):
"All inline references are pending under MySQL"
return [], True
def sql_destroy_indexes_for_fields(self, model, fields, style):
if len(fields) == 1 and fields[0].db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(fields[0].db_tablespace)
elif model._meta.db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace)
else:
tablespace_sql = ""
if tablespace_sql:
tablespace_sql = " " + tablespace_sql
field_names = []
qn = self.connection.ops.quote_name
for f in fields:
field_names.append(style.SQL_FIELD(qn(f.column)))
index_name = "%s_%s" % (model._meta.db_table, self._digest([f.name for f in fields]))
from ..utils import truncate_name
return [
style.SQL_KEYWORD("DROP INDEX") + " " +
style.SQL_TABLE(qn(truncate_name(index_name, self.connection.ops.max_name_length()))) + " " +
style.SQL_KEYWORD("ON") + " " +
style.SQL_TABLE(qn(model._meta.db_table)) + ";",
]

View File

@ -1,5 +1,4 @@
from django.db.backends.base.creation import BaseDatabaseCreation from django.db.backends.base.creation import BaseDatabaseCreation
from django.db.backends.utils import truncate_name
class DatabaseCreation(BaseDatabaseCreation): class DatabaseCreation(BaseDatabaseCreation):
@ -12,40 +11,3 @@ class DatabaseCreation(BaseDatabaseCreation):
if test_settings['CHARSET']: if test_settings['CHARSET']:
return "WITH ENCODING '%s'" % test_settings['CHARSET'] return "WITH ENCODING '%s'" % test_settings['CHARSET']
return '' return ''
def sql_indexes_for_field(self, model, f, style):
output = []
db_type = f.db_type(connection=self.connection)
if db_type is not None and (f.db_index or f.unique):
qn = self.connection.ops.quote_name
db_table = model._meta.db_table
tablespace = f.db_tablespace or model._meta.db_tablespace
if tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(tablespace)
if tablespace_sql:
tablespace_sql = ' ' + tablespace_sql
else:
tablespace_sql = ''
def get_index_sql(index_name, opclass=''):
return (style.SQL_KEYWORD('CREATE INDEX') + ' ' +
style.SQL_TABLE(qn(truncate_name(index_name, self.connection.ops.max_name_length()))) + ' ' +
style.SQL_KEYWORD('ON') + ' ' +
style.SQL_TABLE(qn(db_table)) + ' ' +
"(%s%s)" % (style.SQL_FIELD(qn(f.column)), opclass) +
"%s;" % tablespace_sql)
if not f.unique:
output = [get_index_sql('%s_%s' % (db_table, f.column))]
# Fields with database column types of `varchar` and `text` need
# a second index that specifies their operator class, which is
# needed when performing correct LIKE queries outside the
# C locale. See #12234.
if db_type.startswith('varchar'):
output.append(get_index_sql('%s_%s_like' % (db_table, f.column),
' varchar_pattern_ops'))
elif db_type.startswith('text'):
output.append(get_index_sql('%s_%s_like' % (db_table, f.column),
' text_pattern_ops'))
return output

View File

@ -8,14 +8,6 @@ from django.utils.six.moves import input
class DatabaseCreation(BaseDatabaseCreation): class DatabaseCreation(BaseDatabaseCreation):
def sql_for_pending_references(self, model, style, pending_references):
"SQLite3 doesn't support constraints"
return []
def sql_remove_table_constraints(self, model, references_to_delete, style):
"SQLite3 doesn't support constraints"
return []
def _get_test_db_name(self): def _get_test_db_name(self):
test_database_name = self.connection.settings_dict['TEST']['NAME'] test_database_name = self.connection.settings_dict['TEST']['NAME']
if test_database_name and test_database_name != ':memory:': if test_database_name and test_database_name != ':memory:':

View File

@ -1,48 +1,11 @@
from unittest import skipUnless from unittest import skipUnless
from django.core.management.color import no_style
from django.db import connection from django.db import connection
from django.test import TestCase, ignore_warnings from django.test import TestCase
from django.utils.deprecation import RemovedInDjango20Warning
from .models import Article, ArticleTranslation, IndexTogetherSingleList from .models import Article, ArticleTranslation, IndexTogetherSingleList
@ignore_warnings(category=RemovedInDjango20Warning)
class CreationIndexesTests(TestCase):
"""
Test index handling by the to-be-deprecated connection.creation interface.
"""
def test_index_together(self):
index_sql = connection.creation.sql_indexes_for_model(Article, no_style())
self.assertEqual(len(index_sql), 1)
def test_index_together_single_list(self):
# Test for using index_together with a single list (#22172)
index_sql = connection.creation.sql_indexes_for_model(IndexTogetherSingleList, no_style())
self.assertEqual(len(index_sql), 1)
@skipUnless(connection.vendor == 'postgresql',
"This is a postgresql-specific issue")
def test_postgresql_text_indexes(self):
"""Test creation of PostgreSQL-specific text indexes (#12234)"""
from .models import IndexedArticle
index_sql = connection.creation.sql_indexes_for_model(IndexedArticle, no_style())
self.assertEqual(len(index_sql), 5)
self.assertIn('("headline" varchar_pattern_ops)', index_sql[1])
self.assertIn('("body" text_pattern_ops)', index_sql[3])
# unique=True and db_index=True should only create the varchar-specific
# index (#19441).
self.assertIn('("slug" varchar_pattern_ops)', index_sql[4])
@skipUnless(connection.vendor == 'postgresql',
"This is a postgresql-specific issue")
def test_postgresql_virtual_relation_indexes(self):
"""Test indexes are not created for related objects"""
index_sql = connection.creation.sql_indexes_for_model(Article, no_style())
self.assertEqual(len(index_sql), 1)
class SchemaIndexesTests(TestCase): class SchemaIndexesTests(TestCase):
""" """
Test index handling by the db.backends.schema infrastructure. Test index handling by the db.backends.schema infrastructure.