import sys import time try: set except NameError: # Python 2.3 compat from sets import Set as set from django.conf import settings from django.core.management import call_command # The prefix to put on the default database name when creating # the test database. TEST_DATABASE_PREFIX = 'test_' class BaseDatabaseCreation(object): """ This class encapsulates all backend-specific differences that pertain to database *creation*, such as the column types to use for particular Django Fields, the SQL used to create and destroy tables, and the creation and destruction of test databases. """ data_types = {} def __init__(self, connection): self.connection = connection def _digest(self, *args): """ Generates a 32-bit digest of a set of arguments that can be used to shorten identifying names. """ return '%x' % (abs(hash(args)) % 4294967296L) # 2**32 def sql_create_model(self, model, style, known_models=set()): """ Returns the SQL required to create a single model, as a tuple of: (list_of_sql, pending_references_dict) """ from django.db import models opts = model._meta if not opts.managed or opts.proxy: return [], {} final_output = [] table_output = [] pending_references = {} qn = self.connection.ops.quote_name for f in opts.local_fields: col_type = f.db_type() tablespace = f.db_tablespace or opts.db_tablespace if col_type is None: # Skip ManyToManyFields, because they're not represented as # database columns in this table. continue # Make the definition (e.g. 'foo VARCHAR(30)') for this field. field_output = [style.SQL_FIELD(qn(f.column)), style.SQL_COLTYPE(col_type)] if not f.null: field_output.append(style.SQL_KEYWORD('NOT NULL')) if f.primary_key: field_output.append(style.SQL_KEYWORD('PRIMARY KEY')) elif f.unique: field_output.append(style.SQL_KEYWORD('UNIQUE')) if tablespace and f.unique: # We must specify the index tablespace inline, because we # won't be generating a CREATE INDEX statement for this field. field_output.append(self.connection.ops.tablespace_sql(tablespace, inline=True)) if f.rel: ref_output, pending = self.sql_for_inline_foreign_key_references(f, known_models, style) if pending: pr = pending_references.setdefault(f.rel.to, []).append((model, f)) else: field_output.extend(ref_output) table_output.append(' '.join(field_output)) if opts.order_with_respect_to: table_output.append(style.SQL_FIELD(qn('_order')) + ' ' + \ style.SQL_COLTYPE(models.IntegerField().db_type())) for field_constraints in opts.unique_together: table_output.append(style.SQL_KEYWORD('UNIQUE') + ' (%s)' % \ ", ".join([style.SQL_FIELD(qn(opts.get_field(f).column)) for f in field_constraints])) full_statement = [style.SQL_KEYWORD('CREATE TABLE') + ' ' + style.SQL_TABLE(qn(opts.db_table)) + ' ('] for i, line in enumerate(table_output): # Combine and add commas. full_statement.append(' %s%s' % (line, i < len(table_output)-1 and ',' or '')) full_statement.append(')') if opts.db_tablespace: full_statement.append(self.connection.ops.tablespace_sql(opts.db_tablespace)) full_statement.append(';') final_output.append('\n'.join(full_statement)) if opts.has_auto_field: # Add any extra SQL needed to support auto-incrementing primary keys. auto_column = opts.auto_field.db_column or opts.auto_field.name autoinc_sql = self.connection.ops.autoinc_sql(opts.db_table, auto_column) if autoinc_sql: for stmt in autoinc_sql: final_output.append(stmt) return final_output, pending_references def sql_for_inline_foreign_key_references(self, field, known_models, style): "Return the SQL snippet defining the foreign key reference for a field" qn = self.connection.ops.quote_name if field.rel.to in known_models: output = [style.SQL_KEYWORD('REFERENCES') + ' ' + \ style.SQL_TABLE(qn(field.rel.to._meta.db_table)) + ' (' + \ style.SQL_FIELD(qn(field.rel.to._meta.get_field(field.rel.field_name).column)) + ')' + self.connection.ops.deferrable_sql() ] pending = False else: # We haven't yet created the table to which this field # is related, so save it for later. output = [] pending = True return output, pending def sql_for_pending_references(self, model, style, pending_references): "Returns any ALTER TABLE statements to add constraints after the fact." from django.db.backends.util import truncate_name if not model._meta.managed or model._meta.proxy: return [] qn = self.connection.ops.quote_name final_output = [] opts = model._meta if model in pending_references: for rel_class, f in pending_references[model]: rel_opts = rel_class._meta r_table = rel_opts.db_table r_col = f.column table = opts.db_table col = opts.get_field(f.rel.field_name).column # For MySQL, r_name must be unique in the first 64 characters. # So we are careful with character usage here. r_name = '%s_refs_%s_%s' % (r_col, col, self._digest(r_table, table)) final_output.append(style.SQL_KEYWORD('ALTER TABLE') + ' %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' % \ (qn(r_table), qn(truncate_name(r_name, self.connection.ops.max_name_length())), qn(r_col), qn(table), qn(col), self.connection.ops.deferrable_sql())) del pending_references[model] return final_output def sql_for_many_to_many(self, model, style): "Return the CREATE TABLE statments for all the many-to-many tables defined on a model" output = [] for f in model._meta.local_many_to_many: if model._meta.managed or f.rel.to._meta.managed: output.extend(self.sql_for_many_to_many_field(model, f, style)) return output def sql_for_many_to_many_field(self, model, f, style): "Return the CREATE TABLE statements for a single m2m field" from django.db import models from django.db.backends.util import truncate_name output = [] if f.creates_table: opts = model._meta qn = self.connection.ops.quote_name tablespace = f.db_tablespace or opts.db_tablespace if tablespace: sql = self.connection.ops.tablespace_sql(tablespace, inline=True) if sql: tablespace_sql = ' ' + sql else: tablespace_sql = '' else: tablespace_sql = '' table_output = [style.SQL_KEYWORD('CREATE TABLE') + ' ' + \ style.SQL_TABLE(qn(f.m2m_db_table())) + ' ('] table_output.append(' %s %s %s%s,' % (style.SQL_FIELD(qn('id')), style.SQL_COLTYPE(models.AutoField(primary_key=True).db_type()), style.SQL_KEYWORD('NOT NULL PRIMARY KEY'), tablespace_sql)) deferred = [] inline_output, deferred = self.sql_for_inline_many_to_many_references(model, f, style) table_output.extend(inline_output) table_output.append(' %s (%s, %s)%s' % (style.SQL_KEYWORD('UNIQUE'), style.SQL_FIELD(qn(f.m2m_column_name())), style.SQL_FIELD(qn(f.m2m_reverse_name())), tablespace_sql)) table_output.append(')') if opts.db_tablespace: # f.db_tablespace is only for indices, so ignore its value here. table_output.append(self.connection.ops.tablespace_sql(opts.db_tablespace)) table_output.append(';') output.append('\n'.join(table_output)) for r_table, r_col, table, col in deferred: r_name = '%s_refs_%s_%s' % (r_col, col, self._digest(r_table, table)) output.append(style.SQL_KEYWORD('ALTER TABLE') + ' %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' % (qn(r_table), qn(truncate_name(r_name, self.connection.ops.max_name_length())), qn(r_col), qn(table), qn(col), self.connection.ops.deferrable_sql())) # Add any extra SQL needed to support auto-incrementing PKs autoinc_sql = self.connection.ops.autoinc_sql(f.m2m_db_table(), 'id') if autoinc_sql: for stmt in autoinc_sql: output.append(stmt) return output def sql_for_inline_many_to_many_references(self, model, field, style): "Create the references to other tables required by a many-to-many table" from django.db import models opts = model._meta qn = self.connection.ops.quote_name table_output = [ ' %s %s %s %s (%s)%s,' % (style.SQL_FIELD(qn(field.m2m_column_name())), style.SQL_COLTYPE(models.ForeignKey(model).db_type()), style.SQL_KEYWORD('NOT NULL REFERENCES'), style.SQL_TABLE(qn(opts.db_table)), style.SQL_FIELD(qn(opts.pk.column)), self.connection.ops.deferrable_sql()), ' %s %s %s %s (%s)%s,' % (style.SQL_FIELD(qn(field.m2m_reverse_name())), style.SQL_COLTYPE(models.ForeignKey(field.rel.to).db_type()), style.SQL_KEYWORD('NOT NULL REFERENCES'), style.SQL_TABLE(qn(field.rel.to._meta.db_table)), style.SQL_FIELD(qn(field.rel.to._meta.pk.column)), self.connection.ops.deferrable_sql()) ] deferred = [] return table_output, deferred def sql_indexes_for_model(self, model, style): "Returns the CREATE INDEX SQL statements for a single model" if not model._meta.managed or model._meta.proxy: return [] output = [] for f in model._meta.local_fields: output.extend(self.sql_indexes_for_field(model, f, style)) return output def sql_indexes_for_field(self, model, f, style): "Return the CREATE INDEX SQL statements for a single model field" if f.db_index and not f.unique: qn = self.connection.ops.quote_name tablespace = f.db_tablespace or model._meta.db_tablespace if tablespace: sql = self.connection.ops.tablespace_sql(tablespace) if sql: tablespace_sql = ' ' + sql else: tablespace_sql = '' else: tablespace_sql = '' output = [style.SQL_KEYWORD('CREATE INDEX') + ' ' + style.SQL_TABLE(qn('%s_%s' % (model._meta.db_table, f.column))) + ' ' + style.SQL_KEYWORD('ON') + ' ' + style.SQL_TABLE(qn(model._meta.db_table)) + ' ' + "(%s)" % style.SQL_FIELD(qn(f.column)) + "%s;" % tablespace_sql] else: output = [] return output def sql_destroy_model(self, model, references_to_delete, style): "Return the DROP TABLE and restraint dropping statements for a single model" if not model._meta.managed or model._meta.proxy: return [] # Drop the table now qn = self.connection.ops.quote_name output = ['%s %s;' % (style.SQL_KEYWORD('DROP TABLE'), style.SQL_TABLE(qn(model._meta.db_table)))] if model in references_to_delete: output.extend(self.sql_remove_table_constraints(model, references_to_delete, style)) if model._meta.has_auto_field: ds = self.connection.ops.drop_sequence_sql(model._meta.db_table) if ds: output.append(ds) return output def sql_remove_table_constraints(self, model, references_to_delete, style): from django.db.backends.util import truncate_name if not model._meta.managed or model._meta.proxy: return [] output = [] qn = self.connection.ops.quote_name for rel_class, f in references_to_delete[model]: table = rel_class._meta.db_table col = f.column r_table = model._meta.db_table r_col = model._meta.get_field(f.rel.field_name).column r_name = '%s_refs_%s_%s' % (col, r_col, self._digest(table, r_table)) output.append('%s %s %s %s;' % \ (style.SQL_KEYWORD('ALTER TABLE'), style.SQL_TABLE(qn(table)), style.SQL_KEYWORD(self.connection.ops.drop_foreignkey_sql()), style.SQL_FIELD(truncate_name(r_name, self.connection.ops.max_name_length())))) del references_to_delete[model] return output def sql_destroy_many_to_many(self, model, f, style): "Returns the DROP TABLE statements for a single m2m field" qn = self.connection.ops.quote_name output = [] if f.creates_table: output.append("%s %s;" % (style.SQL_KEYWORD('DROP TABLE'), style.SQL_TABLE(qn(f.m2m_db_table())))) ds = self.connection.ops.drop_sequence_sql("%s_%s" % (model._meta.db_table, f.column)) if ds: output.append(ds) return output def create_test_db(self, verbosity=1, autoclobber=False): """ Creates a test database, prompting the user for confirmation if the database already exists. Returns the name of the test database created. """ if verbosity >= 1: print "Creating test database..." test_database_name = self._create_test_db(verbosity, autoclobber) self.connection.close() settings.DATABASE_NAME = test_database_name self.connection.settings_dict["DATABASE_NAME"] = test_database_name can_rollback = self._rollback_works() settings.DATABASE_SUPPORTS_TRANSACTIONS = can_rollback self.connection.settings_dict["DATABASE_SUPPORTS_TRANSACTIONS"] = can_rollback call_command('syncdb', verbosity=verbosity, interactive=False) if settings.CACHE_BACKEND.startswith('db://'): from django.core.cache import parse_backend_uri _, cache_name, _ = parse_backend_uri(settings.CACHE_BACKEND) call_command('createcachetable', cache_name) # Get a cursor (even though we don't need one yet). This has # the side effect of initializing the test database. cursor = self.connection.cursor() return test_database_name def _create_test_db(self, verbosity, autoclobber): "Internal implementation - creates the test db tables." suffix = self.sql_table_creation_suffix() if settings.TEST_DATABASE_NAME: test_database_name = settings.TEST_DATABASE_NAME else: test_database_name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME qn = self.connection.ops.quote_name # Create the test database and connect to it. We need to autocommit # if the database supports it because PostgreSQL doesn't allow # CREATE/DROP DATABASE statements within transactions. cursor = self.connection.cursor() self.set_autocommit() try: cursor.execute("CREATE DATABASE %s %s" % (qn(test_database_name), suffix)) except Exception, e: sys.stderr.write("Got an error creating the test database: %s\n" % e) if not autoclobber: confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % test_database_name) if autoclobber or confirm == 'yes': try: if verbosity >= 1: print "Destroying old test database..." cursor.execute("DROP DATABASE %s" % qn(test_database_name)) if verbosity >= 1: print "Creating test database..." cursor.execute("CREATE DATABASE %s %s" % (qn(test_database_name), suffix)) except Exception, e: sys.stderr.write("Got an error recreating the test database: %s\n" % e) sys.exit(2) else: print "Tests cancelled." sys.exit(1) return test_database_name def _rollback_works(self): cursor = self.connection.cursor() cursor.execute('CREATE TABLE ROLLBACK_TEST (X INT)') self.connection._commit() cursor.execute('INSERT INTO ROLLBACK_TEST (X) VALUES (8)') self.connection._rollback() cursor.execute('SELECT COUNT(X) FROM ROLLBACK_TEST') count, = cursor.fetchone() cursor.execute('DROP TABLE ROLLBACK_TEST') self.connection._commit() return count == 0 def destroy_test_db(self, old_database_name, verbosity=1): """ Destroy a test database, prompting the user for confirmation if the database already exists. Returns the name of the test database created. """ if verbosity >= 1: print "Destroying test database..." self.connection.close() test_database_name = settings.DATABASE_NAME settings.DATABASE_NAME = old_database_name self.connection.settings_dict["DATABASE_NAME"] = old_database_name self._destroy_test_db(test_database_name, verbosity) def _destroy_test_db(self, test_database_name, verbosity): "Internal implementation - remove the test db tables." # Remove the test database to clean up after # ourselves. Connect to the previous database (not the test database) # to do so, because it's not allowed to delete a database while being # connected to it. cursor = self.connection.cursor() self.set_autocommit() time.sleep(1) # To avoid "database is being accessed by other users" errors. cursor.execute("DROP DATABASE %s" % self.connection.ops.quote_name(test_database_name)) self.connection.close() def set_autocommit(self): "Make sure a connection is in autocommit mode." if hasattr(self.connection.connection, "autocommit"): if callable(self.connection.connection.autocommit): self.connection.connection.autocommit(True) else: self.connection.connection.autocommit = True elif hasattr(self.connection.connection, "set_isolation_level"): self.connection.connection.set_isolation_level(0) def sql_table_creation_suffix(self): "SQL to append to the end of the test table creation statements" return ''