import sys import time from django.conf import settings # The prefix to put on the default database name when creating # the test database. TEST_DATABASE_PREFIX = 'test_' class BaseDatabaseCreation(object): """ This class encapsulates all backend-specific differences that pertain to database *creation*, such as the column types to use for particular Django Fields, the SQL used to create and destroy tables, and the creation and destruction of test databases. """ data_types = {} def __init__(self, connection): self.connection = connection def _digest(self, *args): """ Generates a 32-bit digest of a set of arguments that can be used to shorten identifying names. """ return '%x' % (abs(hash(args)) % 4294967296L) # 2**32 def sql_create_model(self, model, style, known_models=set()): """ Returns the SQL required to create a single model, as a tuple of: (list_of_sql, pending_references_dict) """ opts = model._meta if not opts.managed or opts.proxy: return [], {} final_output = [] table_output = [] pending_references = {} qn = self.connection.ops.quote_name for f in opts.local_fields: col_type = f.db_type(connection=self.connection) tablespace = f.db_tablespace or opts.db_tablespace if col_type is None: # Skip ManyToManyFields, because they're not represented as # database columns in this table. continue # Make the definition (e.g. 'foo VARCHAR(30)') for this field. field_output = [style.SQL_FIELD(qn(f.column)), style.SQL_COLTYPE(col_type)] if not f.null: field_output.append(style.SQL_KEYWORD('NOT NULL')) if f.primary_key: field_output.append(style.SQL_KEYWORD('PRIMARY KEY')) elif f.unique: field_output.append(style.SQL_KEYWORD('UNIQUE')) if tablespace and f.unique: # We must specify the index tablespace inline, because we # won't be generating a CREATE INDEX statement for this field. field_output.append(self.connection.ops.tablespace_sql(tablespace, inline=True)) if f.rel: ref_output, pending = self.sql_for_inline_foreign_key_references(f, known_models, style) if pending: pr = pending_references.setdefault(f.rel.to, []).append((model, f)) else: field_output.extend(ref_output) table_output.append(' '.join(field_output)) for field_constraints in opts.unique_together: table_output.append(style.SQL_KEYWORD('UNIQUE') + ' (%s)' % \ ", ".join([style.SQL_FIELD(qn(opts.get_field(f).column)) for f in field_constraints])) full_statement = [style.SQL_KEYWORD('CREATE TABLE') + ' ' + style.SQL_TABLE(qn(opts.db_table)) + ' ('] for i, line in enumerate(table_output): # Combine and add commas. full_statement.append(' %s%s' % (line, i < len(table_output)-1 and ',' or '')) full_statement.append(')') if opts.db_tablespace: full_statement.append(self.connection.ops.tablespace_sql(opts.db_tablespace)) full_statement.append(';') final_output.append('\n'.join(full_statement)) if opts.has_auto_field: # Add any extra SQL needed to support auto-incrementing primary keys. auto_column = opts.auto_field.db_column or opts.auto_field.name autoinc_sql = self.connection.ops.autoinc_sql(opts.db_table, auto_column) if autoinc_sql: for stmt in autoinc_sql: final_output.append(stmt) return final_output, pending_references def sql_for_inline_foreign_key_references(self, field, known_models, style): "Return the SQL snippet defining the foreign key reference for a field" qn = self.connection.ops.quote_name if field.rel.to in known_models: output = [style.SQL_KEYWORD('REFERENCES') + ' ' + \ style.SQL_TABLE(qn(field.rel.to._meta.db_table)) + ' (' + \ style.SQL_FIELD(qn(field.rel.to._meta.get_field(field.rel.field_name).column)) + ')' + self.connection.ops.deferrable_sql() ] pending = False else: # We haven't yet created the table to which this field # is related, so save it for later. output = [] pending = True return output, pending def sql_for_pending_references(self, model, style, pending_references): "Returns any ALTER TABLE statements to add constraints after the fact." from django.db.backends.util import truncate_name if not model._meta.managed or model._meta.proxy: return [] qn = self.connection.ops.quote_name final_output = [] opts = model._meta if model in pending_references: for rel_class, f in pending_references[model]: rel_opts = rel_class._meta r_table = rel_opts.db_table r_col = f.column table = opts.db_table col = opts.get_field(f.rel.field_name).column # For MySQL, r_name must be unique in the first 64 characters. # So we are careful with character usage here. r_name = '%s_refs_%s_%s' % (r_col, col, self._digest(r_table, table)) final_output.append(style.SQL_KEYWORD('ALTER TABLE') + ' %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' % \ (qn(r_table), qn(truncate_name(r_name, self.connection.ops.max_name_length())), qn(r_col), qn(table), qn(col), self.connection.ops.deferrable_sql())) del pending_references[model] return final_output def sql_indexes_for_model(self, model, style): "Returns the CREATE INDEX SQL statements for a single model" if not model._meta.managed or model._meta.proxy: return [] output = [] for f in model._meta.local_fields: output.extend(self.sql_indexes_for_field(model, f, style)) return output def sql_indexes_for_field(self, model, f, style): "Return the CREATE INDEX SQL statements for a single model field" from django.db.backends.util import truncate_name if f.db_index and not f.unique: qn = self.connection.ops.quote_name tablespace = f.db_tablespace or model._meta.db_tablespace if tablespace: sql = self.connection.ops.tablespace_sql(tablespace) if sql: tablespace_sql = ' ' + sql else: tablespace_sql = '' else: tablespace_sql = '' i_name = '%s_%s' % (model._meta.db_table, self._digest(f.column)) output = [style.SQL_KEYWORD('CREATE INDEX') + ' ' + style.SQL_TABLE(qn(truncate_name(i_name, self.connection.ops.max_name_length()))) + ' ' + style.SQL_KEYWORD('ON') + ' ' + style.SQL_TABLE(qn(model._meta.db_table)) + ' ' + "(%s)" % style.SQL_FIELD(qn(f.column)) + "%s;" % tablespace_sql] else: output = [] return output def sql_destroy_model(self, model, references_to_delete, style): "Return the DROP TABLE and restraint dropping statements for a single model" if not model._meta.managed or model._meta.proxy: return [] # Drop the table now qn = self.connection.ops.quote_name output = ['%s %s;' % (style.SQL_KEYWORD('DROP TABLE'), style.SQL_TABLE(qn(model._meta.db_table)))] if model in references_to_delete: output.extend(self.sql_remove_table_constraints(model, references_to_delete, style)) if model._meta.has_auto_field: ds = self.connection.ops.drop_sequence_sql(model._meta.db_table) if ds: output.append(ds) return output def sql_remove_table_constraints(self, model, references_to_delete, style): from django.db.backends.util import truncate_name if not model._meta.managed or model._meta.proxy: return [] output = [] qn = self.connection.ops.quote_name for rel_class, f in references_to_delete[model]: table = rel_class._meta.db_table col = f.column r_table = model._meta.db_table r_col = model._meta.get_field(f.rel.field_name).column r_name = '%s_refs_%s_%s' % (col, r_col, self._digest(table, r_table)) output.append('%s %s %s %s;' % \ (style.SQL_KEYWORD('ALTER TABLE'), style.SQL_TABLE(qn(table)), style.SQL_KEYWORD(self.connection.ops.drop_foreignkey_sql()), style.SQL_FIELD(qn(truncate_name(r_name, self.connection.ops.max_name_length()))))) del references_to_delete[model] return output def create_test_db(self, verbosity=1, autoclobber=False): """ Creates a test database, prompting the user for confirmation if the database already exists. Returns the name of the test database created. """ # Don't import django.core.management if it isn't needed. from django.core.management import call_command test_database_name = self._get_test_db_name() if verbosity >= 1: test_db_repr = '' if verbosity >= 2: test_db_repr = " ('%s')" % test_database_name print "Creating test database for alias '%s'%s..." % (self.connection.alias, test_db_repr) self._create_test_db(verbosity, autoclobber) self.connection.close() self.connection.settings_dict["NAME"] = test_database_name # Confirm the feature set of the test database self.connection.features.confirm() # Report syncdb messages at one level lower than that requested. # This ensures we don't get flooded with messages during testing # (unless you really ask to be flooded) call_command('syncdb', verbosity=max(verbosity - 1, 0), interactive=False, database=self.connection.alias, load_initial_data=False) # We need to then do a flush to ensure that any data installed by # custom SQL has been removed. The only test data should come from # test fixtures, or autogenerated from post_syncdb triggers. # This has the side effect of loading initial data (which was # intentionally skipped in the syncdb). call_command('flush', verbosity=max(verbosity - 1, 0), interactive=False, database=self.connection.alias) # One effect of calling syncdb followed by flush is that the id of the # default site may or may not be 1, depending on how the sequence was # reset. If the sites app is loaded, then we coerce it. from django.db.models import get_model Site = get_model('sites', 'Site') if Site is not None and Site.objects.using(self.connection.alias).count() == 1: Site.objects.using(self.connection.alias).update(id=settings.SITE_ID) from django.core.cache import get_cache from django.core.cache.backends.db import BaseDatabaseCache for cache_alias in settings.CACHES: cache = get_cache(cache_alias) if isinstance(cache, BaseDatabaseCache): from django.db import router if router.allow_syncdb(self.connection.alias, cache.cache_model_class): call_command('createcachetable', cache._table, database=self.connection.alias) # Get a cursor (even though we don't need one yet). This has # the side effect of initializing the test database. cursor = self.connection.cursor() return test_database_name def _get_test_db_name(self): """ Internal implementation - returns the name of the test DB that will be created. Only useful when called from create_test_db() and _create_test_db() and when no external munging is done with the 'NAME' or 'TEST_NAME' settings. """ if self.connection.settings_dict['TEST_NAME']: return self.connection.settings_dict['TEST_NAME'] return TEST_DATABASE_PREFIX + self.connection.settings_dict['NAME'] def _create_test_db(self, verbosity, autoclobber): "Internal implementation - creates the test db tables." suffix = self.sql_table_creation_suffix() test_database_name = self._get_test_db_name() qn = self.connection.ops.quote_name # Create the test database and connect to it. We need to autocommit # if the database supports it because PostgreSQL doesn't allow # CREATE/DROP DATABASE statements within transactions. cursor = self.connection.cursor() self.set_autocommit() try: cursor.execute("CREATE DATABASE %s %s" % (qn(test_database_name), suffix)) except Exception, e: sys.stderr.write("Got an error creating the test database: %s\n" % e) if not autoclobber: confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % test_database_name) if autoclobber or confirm == 'yes': try: if verbosity >= 1: print "Destroying old test database '%s'..." % self.connection.alias cursor.execute("DROP DATABASE %s" % qn(test_database_name)) cursor.execute("CREATE DATABASE %s %s" % (qn(test_database_name), suffix)) except Exception, e: sys.stderr.write("Got an error recreating the test database: %s\n" % e) sys.exit(2) else: print "Tests cancelled." sys.exit(1) return test_database_name def destroy_test_db(self, old_database_name, verbosity=1): """ Destroy a test database, prompting the user for confirmation if the database already exists. Returns the name of the test database created. """ self.connection.close() test_database_name = self.connection.settings_dict['NAME'] if verbosity >= 1: test_db_repr = '' if verbosity >= 2: test_db_repr = " ('%s')" % test_database_name print "Destroying test database for alias '%s'%s..." % (self.connection.alias, test_db_repr) self.connection.settings_dict['NAME'] = old_database_name self._destroy_test_db(test_database_name, verbosity) def _destroy_test_db(self, test_database_name, verbosity): "Internal implementation - remove the test db tables." # Remove the test database to clean up after # ourselves. Connect to the previous database (not the test database) # to do so, because it's not allowed to delete a database while being # connected to it. cursor = self.connection.cursor() self.set_autocommit() time.sleep(1) # To avoid "database is being accessed by other users" errors. cursor.execute("DROP DATABASE %s" % self.connection.ops.quote_name(test_database_name)) self.connection.close() def set_autocommit(self): "Make sure a connection is in autocommit mode." if hasattr(self.connection.connection, "autocommit"): if callable(self.connection.connection.autocommit): self.connection.connection.autocommit(True) else: self.connection.connection.autocommit = True elif hasattr(self.connection.connection, "set_isolation_level"): self.connection.connection.set_isolation_level(0) def sql_table_creation_suffix(self): "SQL to append to the end of the test table creation statements" return '' def test_db_signature(self): """ Returns a tuple with elements of self.connection.settings_dict (a DATABASES setting value) that uniquely identify a database accordingly to the RDBMS particularities. """ settings_dict = self.connection.settings_dict return ( settings_dict['HOST'], settings_dict['PORT'], settings_dict['ENGINE'], settings_dict['NAME'] )