""" MySQL database backend for Django. Requires MySQLdb: http://sourceforge.net/projects/mysql-python """ from django.db.backends import BaseDatabaseWrapper, BaseDatabaseOperations, util from django.utils.encoding import force_unicode try: import MySQLdb as Database except ImportError, e: from django.core.exceptions import ImproperlyConfigured raise ImproperlyConfigured, "Error loading MySQLdb module: %s" % e from MySQLdb.converters import conversions from MySQLdb.constants import FIELD_TYPE import types import re DatabaseError = Database.DatabaseError IntegrityError = Database.IntegrityError django_conversions = conversions.copy() django_conversions.update({ types.BooleanType: util.rev_typecast_boolean, FIELD_TYPE.DATETIME: util.typecast_timestamp, FIELD_TYPE.DATE: util.typecast_date, FIELD_TYPE.TIME: util.typecast_time, FIELD_TYPE.DECIMAL: util.typecast_decimal, FIELD_TYPE.STRING: force_unicode, FIELD_TYPE.VAR_STRING: force_unicode, # Note: We don't add a convertor for BLOB here. Doesn't seem to be required. }) # This should match the numerical portion of the version numbers (we can treat # versions like 5.0.24 and 5.0.24a as the same). Based on the list of version # at http://dev.mysql.com/doc/refman/4.1/en/news.html and # http://dev.mysql.com/doc/refman/5.0/en/news.html . server_version_re = re.compile(r'(\d{1,2})\.(\d{1,2})\.(\d{1,2})') # This is an extra debug layer over MySQL queries, to display warnings. # It's only used when DEBUG=True. class MysqlDebugWrapper: def __init__(self, cursor): self.cursor = cursor def execute(self, sql, params=()): try: return self.cursor.execute(sql, params) except Database.Warning, w: self.cursor.execute("SHOW WARNINGS") raise Database.Warning, "%s: %s" % (w, self.cursor.fetchall()) def executemany(self, sql, param_list): try: return self.cursor.executemany(sql, param_list) except Database.Warning, w: self.cursor.execute("SHOW WARNINGS") raise Database.Warning, "%s: %s" % (w, self.cursor.fetchall()) def __getattr__(self, attr): if attr in self.__dict__: return self.__dict__[attr] else: return getattr(self.cursor, attr) class DatabaseOperations(BaseDatabaseOperations): def date_extract_sql(self, lookup_type, field_name): # http://dev.mysql.com/doc/mysql/en/date-and-time-functions.html return "EXTRACT(%s FROM %s)" % (lookup_type.upper(), field_name) def date_trunc_sql(self, lookup_type, field_name): fields = ['year', 'month', 'day', 'hour', 'minute', 'second'] format = ('%%Y-', '%%m', '-%%d', ' %%H:', '%%i', ':%%s') # Use double percents to escape. format_def = ('0000-', '01', '-01', ' 00:', '00', ':00') try: i = fields.index(lookup_type) + 1 except ValueError: sql = field_name else: format_str = ''.join([f for f in format[:i]] + [f for f in format_def[i:]]) sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str) return sql def drop_foreignkey_sql(self): return "DROP FOREIGN KEY" class DatabaseWrapper(BaseDatabaseWrapper): ops = DatabaseOperations() def __init__(self, **kwargs): super(DatabaseWrapper, self).__init__(**kwargs) self.server_version = None def _valid_connection(self): if self.connection is not None: try: self.connection.ping() return True except DatabaseError: self.connection.close() self.connection = None return False def _cursor(self, settings): if not self._valid_connection(): kwargs = { # Note: use_unicode intentonally not set to work around some # backwards-compat issues. We do it manually. 'user': settings.DATABASE_USER, 'db': settings.DATABASE_NAME, 'passwd': settings.DATABASE_PASSWORD, 'conv': django_conversions, } if settings.DATABASE_HOST.startswith('/'): kwargs['unix_socket'] = settings.DATABASE_HOST else: kwargs['host'] = settings.DATABASE_HOST if settings.DATABASE_PORT: kwargs['port'] = int(settings.DATABASE_PORT) kwargs.update(self.options) self.connection = Database.connect(**kwargs) cursor = self.connection.cursor() if self.connection.get_server_info() >= '4.1' and not self.connection.character_set_name().startswith('utf8'): if hasattr(self.connection, 'charset'): # MySQLdb < 1.2.1 backwards-compat hacks. conn = self.connection cursor.execute("SET NAMES 'utf8'") cursor.execute("SET CHARACTER SET 'utf8'") to_str = lambda u, dummy=None, c=conn: c.literal(u.encode('utf-8')) conn.converter[unicode] = to_str else: self.connection.set_character_set('utf8') else: cursor = self.connection.cursor() return cursor def make_debug_cursor(self, cursor): return BaseDatabaseWrapper.make_debug_cursor(self, MysqlDebugWrapper(cursor)) def _rollback(self): try: BaseDatabaseWrapper._rollback(self) except Database.NotSupportedError: pass def get_server_version(self): if not self.server_version: if not self._valid_connection(): self.cursor() m = server_version_re.match(self.connection.get_server_info()) if not m: raise Exception('Unable to determine MySQL version from version string %r' % self.connection.get_server_info()) self.server_version = tuple([int(x) for x in m.groups()]) return self.server_version allows_group_by_ordinal = True allows_unique_and_pk = True autoindexes_primary_keys = False needs_datetime_string_cast = True # MySQLdb requires a typecast for dates needs_upper_for_iops = False supports_constraints = True supports_tablespaces = False uses_case_insensitive_names = False def quote_name(name): if name.startswith("`") and name.endswith("`"): return name # Quoting once is enough. return "`%s`" % name dictfetchone = util.dictfetchone dictfetchmany = util.dictfetchmany dictfetchall = util.dictfetchall def get_last_insert_id(cursor, table_name, pk_name): return cursor.lastrowid def get_limit_offset_sql(limit, offset=None): sql = "LIMIT " if offset and offset != 0: sql += "%s," % offset return sql + str(limit) def get_random_function_sql(): return "RAND()" def get_fulltext_search_sql(field_name): return 'MATCH (%s) AGAINST (%%s IN BOOLEAN MODE)' % field_name def get_pk_default_value(): return "DEFAULT" def get_max_name_length(): return None; def get_start_transaction_sql(): return "BEGIN;" def get_sql_flush(style, tables, sequences): """Return a list of SQL statements required to remove all data from all tables in the database (without actually removing the tables themselves) and put the database in an empty 'initial' state """ # NB: The generated SQL below is specific to MySQL # 'TRUNCATE x;', 'TRUNCATE y;', 'TRUNCATE z;'... style SQL statements # to clear all tables of all data if tables: sql = ['SET FOREIGN_KEY_CHECKS = 0;'] + \ ['%s %s;' % \ (style.SQL_KEYWORD('TRUNCATE'), style.SQL_FIELD(quote_name(table)) ) for table in tables] + \ ['SET FOREIGN_KEY_CHECKS = 1;'] # 'ALTER TABLE table AUTO_INCREMENT = 1;'... style SQL statements # to reset sequence indices sql.extend(["%s %s %s %s %s;" % \ (style.SQL_KEYWORD('ALTER'), style.SQL_KEYWORD('TABLE'), style.SQL_TABLE(quote_name(sequence['table'])), style.SQL_KEYWORD('AUTO_INCREMENT'), style.SQL_FIELD('= 1'), ) for sequence in sequences]) return sql else: return [] def get_sql_sequence_reset(style, model_list): "Returns a list of the SQL statements to reset sequences for the given models." # No sequence reset required return [] OPERATOR_MAPPING = { 'exact': '= %s', 'iexact': 'LIKE %s', 'contains': 'LIKE BINARY %s', 'icontains': 'LIKE %s', 'regex': 'REGEXP BINARY %s', 'iregex': 'REGEXP %s', 'gt': '> %s', 'gte': '>= %s', 'lt': '< %s', 'lte': '<= %s', 'startswith': 'LIKE BINARY %s', 'endswith': 'LIKE BINARY %s', 'istartswith': 'LIKE %s', 'iendswith': 'LIKE %s', }