Fixed up the introspection code and tests added for #9779 to run under Python 2.3, which has neither set nor rsplit.
git-svn-id: http://code.djangoproject.com/svn/django/trunk@10392 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
parent
bb31cf37ff
commit
48e01d2b3d
|
@ -56,54 +56,54 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
|
||||||
info['null_ok']) for info in self._table_info(cursor, table_name)]
|
info['null_ok']) for info in self._table_info(cursor, table_name)]
|
||||||
|
|
||||||
def get_relations(self, cursor, table_name):
|
def get_relations(self, cursor, table_name):
|
||||||
"""
|
"""
|
||||||
Returns a dictionary of {field_index: (field_index_other_table, other_table)}
|
Returns a dictionary of {field_index: (field_index_other_table, other_table)}
|
||||||
representing all relationships to the given table. Indexes are 0-based.
|
representing all relationships to the given table. Indexes are 0-based.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Dictionary of relations to return
|
# Dictionary of relations to return
|
||||||
relations = {}
|
relations = {}
|
||||||
|
|
||||||
# Schema for this table
|
# Schema for this table
|
||||||
cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table_name])
|
cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table_name])
|
||||||
results = cursor.fetchone()[0].strip()
|
results = cursor.fetchone()[0].strip()
|
||||||
results = results.split('(', 1)[1]
|
results = results[results.index('(')+1:results.rindex(')')]
|
||||||
results = results.rsplit(')', 1)[0]
|
|
||||||
|
|
||||||
# Walk through and look for references to other tables. SQLite doesn't
|
# Walk through and look for references to other tables. SQLite doesn't
|
||||||
# really have enforced references, but since it echoes out the SQL used
|
# really have enforced references, but since it echoes out the SQL used
|
||||||
# to create the table we can look for REFERENCES statements used there.
|
# to create the table we can look for REFERENCES statements used there.
|
||||||
for field_index, field_desc in enumerate(results.split(',')):
|
for field_index, field_desc in enumerate(results.split(',')):
|
||||||
field_desc = field_desc.strip()
|
field_desc = field_desc.strip()
|
||||||
if field_desc.startswith("UNIQUE"):
|
if field_desc.startswith("UNIQUE"):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
m = re.search('references (.*) \(["|](.*)["|]\)', field_desc, re.I)
|
m = re.search('references (.*) \(["|](.*)["|]\)', field_desc, re.I)
|
||||||
if not m:
|
if not m:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
table, column = [s.strip('"') for s in m.groups()]
|
table, column = [s.strip('"') for s in m.groups()]
|
||||||
|
|
||||||
cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table])
|
cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table])
|
||||||
result = cursor.fetchone()
|
result = cursor.fetchone()
|
||||||
if not result:
|
if not result:
|
||||||
continue
|
continue
|
||||||
other_table_results = result[0].strip()
|
other_table_results = result[0].strip()
|
||||||
other_table_results = other_table_results.split('(', 1)[1]
|
li, ri = other_table_results.index('('), other_table_results.rindex(')')
|
||||||
other_table_results = other_table_results.rsplit(')', 1)[0]
|
other_table_results = other_table_results[li+1:ri]
|
||||||
|
|
||||||
for other_index, other_desc in enumerate(other_table_results.split(',')):
|
|
||||||
other_desc = other_desc.strip()
|
for other_index, other_desc in enumerate(other_table_results.split(',')):
|
||||||
if other_desc.startswith('UNIQUE'):
|
other_desc = other_desc.strip()
|
||||||
continue
|
if other_desc.startswith('UNIQUE'):
|
||||||
|
continue
|
||||||
name = other_desc.split(' ', 1)[0].strip('"')
|
|
||||||
if name == column:
|
name = other_desc.split(' ', 1)[0].strip('"')
|
||||||
|
if name == column:
|
||||||
relations[field_index] = (other_index, table)
|
relations[field_index] = (other_index, table)
|
||||||
break
|
break
|
||||||
|
|
||||||
return relations
|
return relations
|
||||||
|
|
||||||
def get_indexes(self, cursor, table_name):
|
def get_indexes(self, cursor, table_name):
|
||||||
"""
|
"""
|
||||||
Returns a dictionary of fieldname -> infodict for the given table,
|
Returns a dictionary of fieldname -> infodict for the given table,
|
||||||
|
|
|
@ -5,13 +5,18 @@ from django.utils import functional
|
||||||
|
|
||||||
from models import Reporter, Article
|
from models import Reporter, Article
|
||||||
|
|
||||||
|
try:
|
||||||
|
set
|
||||||
|
except NameError:
|
||||||
|
from sets import Set as set # Python 2.3 fallback
|
||||||
|
|
||||||
#
|
#
|
||||||
# The introspection module is optional, so methods tested here might raise
|
# The introspection module is optional, so methods tested here might raise
|
||||||
# NotImplementedError. This is perfectly acceptable behavior for the backend
|
# NotImplementedError. This is perfectly acceptable behavior for the backend
|
||||||
# in question, but the tests need to handle this without failing. Ideally we'd
|
# in question, but the tests need to handle this without failing. Ideally we'd
|
||||||
# skip these tests, but until #4788 is done we'll just ignore them.
|
# skip these tests, but until #4788 is done we'll just ignore them.
|
||||||
#
|
#
|
||||||
# The easiest way to accomplish this is to decorate every test case with a
|
# The easiest way to accomplish this is to decorate every test case with a
|
||||||
# wrapper that ignores the exception.
|
# wrapper that ignores the exception.
|
||||||
#
|
#
|
||||||
# The metaclass is just for fun.
|
# The metaclass is just for fun.
|
||||||
|
@ -35,14 +40,14 @@ class IgnoreNotimplementedError(type):
|
||||||
|
|
||||||
class IntrospectionTests(TestCase):
|
class IntrospectionTests(TestCase):
|
||||||
__metaclass__ = IgnoreNotimplementedError
|
__metaclass__ = IgnoreNotimplementedError
|
||||||
|
|
||||||
def test_table_names(self):
|
def test_table_names(self):
|
||||||
tl = connection.introspection.table_names()
|
tl = connection.introspection.table_names()
|
||||||
self.assert_(Reporter._meta.db_table in tl,
|
self.assert_(Reporter._meta.db_table in tl,
|
||||||
"'%s' isn't in table_list()." % Reporter._meta.db_table)
|
"'%s' isn't in table_list()." % Reporter._meta.db_table)
|
||||||
self.assert_(Article._meta.db_table in tl,
|
self.assert_(Article._meta.db_table in tl,
|
||||||
"'%s' isn't in table_list()." % Article._meta.db_table)
|
"'%s' isn't in table_list()." % Article._meta.db_table)
|
||||||
|
|
||||||
def test_django_table_names(self):
|
def test_django_table_names(self):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
cursor.execute('CREATE TABLE django_introspection_test_table (id INTEGER);');
|
cursor.execute('CREATE TABLE django_introspection_test_table (id INTEGER);');
|
||||||
|
@ -50,26 +55,26 @@ class IntrospectionTests(TestCase):
|
||||||
cursor.execute("DROP TABLE django_introspection_test_table;")
|
cursor.execute("DROP TABLE django_introspection_test_table;")
|
||||||
self.assert_('django_introspection_testcase_table' not in tl,
|
self.assert_('django_introspection_testcase_table' not in tl,
|
||||||
"django_table_names() returned a non-Django table")
|
"django_table_names() returned a non-Django table")
|
||||||
|
|
||||||
def test_installed_models(self):
|
def test_installed_models(self):
|
||||||
tables = [Article._meta.db_table, Reporter._meta.db_table]
|
tables = [Article._meta.db_table, Reporter._meta.db_table]
|
||||||
models = connection.introspection.installed_models(tables)
|
models = connection.introspection.installed_models(tables)
|
||||||
self.assertEqual(models, set([Article, Reporter]))
|
self.assertEqual(models, set([Article, Reporter]))
|
||||||
|
|
||||||
def test_sequence_list(self):
|
def test_sequence_list(self):
|
||||||
sequences = connection.introspection.sequence_list()
|
sequences = connection.introspection.sequence_list()
|
||||||
expected = {'table': Reporter._meta.db_table, 'column': 'id'}
|
expected = {'table': Reporter._meta.db_table, 'column': 'id'}
|
||||||
self.assert_(expected in sequences,
|
self.assert_(expected in sequences,
|
||||||
'Reporter sequence not found in sequence_list()')
|
'Reporter sequence not found in sequence_list()')
|
||||||
|
|
||||||
def test_get_table_description_names(self):
|
def test_get_table_description_names(self):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
|
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
|
||||||
self.assertEqual([r[0] for r in desc],
|
self.assertEqual([r[0] for r in desc],
|
||||||
[f.column for f in Reporter._meta.fields])
|
[f.column for f in Reporter._meta.fields])
|
||||||
|
|
||||||
def test_get_table_description_types(self):
|
def test_get_table_description_types(self):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
|
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
|
||||||
self.assertEqual([datatype(r[1]) for r in desc],
|
self.assertEqual([datatype(r[1]) for r in desc],
|
||||||
['IntegerField', 'CharField', 'CharField', 'CharField'])
|
['IntegerField', 'CharField', 'CharField', 'CharField'])
|
||||||
|
@ -78,7 +83,7 @@ class IntrospectionTests(TestCase):
|
||||||
if settings.DATABASE_ENGINE.startswith('postgresql'):
|
if settings.DATABASE_ENGINE.startswith('postgresql'):
|
||||||
def test_postgresql_real_type(self):
|
def test_postgresql_real_type(self):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
cursor.execute("CREATE TABLE django_introspection_real_test_table (number REAL);")
|
cursor.execute("CREATE TABLE django_introspection_real_test_table (number REAL);")
|
||||||
desc = connection.introspection.get_table_description(cursor, 'django_introspection_real_test_table')
|
desc = connection.introspection.get_table_description(cursor, 'django_introspection_real_test_table')
|
||||||
cursor.execute('DROP TABLE django_introspection_real_test_table;')
|
cursor.execute('DROP TABLE django_introspection_real_test_table;')
|
||||||
self.assertEqual(datatype(desc[0][1]), 'FloatField')
|
self.assertEqual(datatype(desc[0][1]), 'FloatField')
|
||||||
|
@ -86,19 +91,19 @@ class IntrospectionTests(TestCase):
|
||||||
def test_get_relations(self):
|
def test_get_relations(self):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
|
relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
|
||||||
|
|
||||||
# Older versions of MySQL don't have the chops to report on this stuff,
|
# Older versions of MySQL don't have the chops to report on this stuff,
|
||||||
# so just skip it if no relations come back. If they do, though, we
|
# so just skip it if no relations come back. If they do, though, we
|
||||||
# should test that the response is correct.
|
# should test that the response is correct.
|
||||||
if relations:
|
if relations:
|
||||||
# That's {field_index: (field_index_other_table, other_table)}
|
# That's {field_index: (field_index_other_table, other_table)}
|
||||||
self.assertEqual(relations, {3: (0, Reporter._meta.db_table)})
|
self.assertEqual(relations, {3: (0, Reporter._meta.db_table)})
|
||||||
|
|
||||||
def test_get_indexes(self):
|
def test_get_indexes(self):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
indexes = connection.introspection.get_indexes(cursor, Article._meta.db_table)
|
indexes = connection.introspection.get_indexes(cursor, Article._meta.db_table)
|
||||||
self.assertEqual(indexes['reporter_id'], {'unique': False, 'primary_key': False})
|
self.assertEqual(indexes['reporter_id'], {'unique': False, 'primary_key': False})
|
||||||
|
|
||||||
def datatype(dbtype):
|
def datatype(dbtype):
|
||||||
"""Helper to convert a data type into a string."""
|
"""Helper to convert a data type into a string."""
|
||||||
dt = connection.introspection.data_types_reverse[dbtype]
|
dt = connection.introspection.data_types_reverse[dbtype]
|
||||||
|
|
Loading…
Reference in New Issue