More schema test fixing
This commit is contained in:
parent
9313dea700
commit
d0b3536964
|
@ -71,9 +71,11 @@ class Command(NoArgsCommand):
|
||||||
def model_installed(model):
|
def model_installed(model):
|
||||||
opts = model._meta
|
opts = model._meta
|
||||||
converter = connection.introspection.table_name_converter
|
converter = connection.introspection.table_name_converter
|
||||||
return not ((converter(opts.db_table) in tables) or
|
# Note that if a model is unmanaged we short-circuit and never try to install it
|
||||||
|
return opts.managed and not ((converter(opts.db_table) in tables) or
|
||||||
(opts.auto_created and converter(opts.auto_created._meta.db_table) in tables))
|
(opts.auto_created and converter(opts.auto_created._meta.db_table) in tables))
|
||||||
|
|
||||||
|
|
||||||
manifest = SortedDict(
|
manifest = SortedDict(
|
||||||
(app_name, list(filter(model_installed, model_list)))
|
(app_name, list(filter(model_installed, model_list)))
|
||||||
for app_name, model_list in all_models
|
for app_name, model_list in all_models
|
||||||
|
|
|
@ -252,13 +252,14 @@ class AppCache(object):
|
||||||
return {
|
return {
|
||||||
"app_store": SortedDict(self.app_store.items()),
|
"app_store": SortedDict(self.app_store.items()),
|
||||||
"app_labels": dict(self.app_labels.items()),
|
"app_labels": dict(self.app_labels.items()),
|
||||||
"app_models": SortedDict(self.app_models.items()),
|
"app_models": SortedDict((k, SortedDict(v.items())) for k, v in self.app_models.items()),
|
||||||
"app_errors": dict(self.app_errors.items()),
|
"app_errors": dict(self.app_errors.items()),
|
||||||
}
|
}
|
||||||
|
|
||||||
def restore_state(self, state):
|
def restore_state(self, state):
|
||||||
"""
|
"""
|
||||||
Restores the AppCache to a previous state from save_state.
|
Restores the AppCache to a previous state from save_state.
|
||||||
|
Note that the state is used by reference, not copied in.
|
||||||
"""
|
"""
|
||||||
self.app_store = state['app_store']
|
self.app_store = state['app_store']
|
||||||
self.app_labels = state['app_labels']
|
self.app_labels = state['app_labels']
|
||||||
|
|
|
@ -31,20 +31,18 @@ class SchemaTests(TestCase):
|
||||||
# The unmanaged models need to be removed after the test in order to
|
# The unmanaged models need to be removed after the test in order to
|
||||||
# prevent bad interactions with the flush operation in other tests.
|
# prevent bad interactions with the flush operation in other tests.
|
||||||
self.cache_state = cache.save_state()
|
self.cache_state = cache.save_state()
|
||||||
cache.load_app("modeltests.schema")
|
|
||||||
for model in self.models:
|
for model in self.models:
|
||||||
model._meta.managed = True
|
model._meta.managed = True
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
# Rollback anything that may have happened
|
|
||||||
connection.rollback()
|
|
||||||
# Delete any tables made for our models
|
# Delete any tables made for our models
|
||||||
self.delete_tables()
|
self.delete_tables()
|
||||||
|
# Rollback anything that may have happened
|
||||||
|
connection.rollback()
|
||||||
|
connection.leave_transaction_management()
|
||||||
# Unhook our models
|
# Unhook our models
|
||||||
for model in self.models:
|
for model in self.models:
|
||||||
model._meta.managed = False
|
model._meta.managed = False
|
||||||
if "schema" in self.cache_state['app_labels']:
|
|
||||||
del self.cache_state['app_labels']['schema']
|
|
||||||
cache.restore_state(self.cache_state)
|
cache.restore_state(self.cache_state)
|
||||||
|
|
||||||
def delete_tables(self):
|
def delete_tables(self):
|
||||||
|
@ -280,31 +278,36 @@ class SchemaTests(TestCase):
|
||||||
# Create an M2M field
|
# Create an M2M field
|
||||||
new_field = ManyToManyField("schema.Tag", related_name="authors")
|
new_field = ManyToManyField("schema.Tag", related_name="authors")
|
||||||
new_field.contribute_to_class(AuthorWithM2M, "tags")
|
new_field.contribute_to_class(AuthorWithM2M, "tags")
|
||||||
# Ensure there's no m2m table there
|
try:
|
||||||
self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through)
|
# Ensure there's no m2m table there
|
||||||
connection.rollback()
|
self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through)
|
||||||
# Add the field
|
connection.rollback()
|
||||||
editor = connection.schema_editor()
|
# Add the field
|
||||||
editor.start()
|
editor = connection.schema_editor()
|
||||||
editor.create_field(
|
editor.start()
|
||||||
Author,
|
editor.create_field(
|
||||||
new_field,
|
Author,
|
||||||
)
|
new_field,
|
||||||
editor.commit()
|
)
|
||||||
# Ensure there is now an m2m table there
|
editor.commit()
|
||||||
columns = self.column_classes(new_field.rel.through)
|
# Ensure there is now an m2m table there
|
||||||
self.assertEqual(columns['tag_id'][0], "IntegerField")
|
columns = self.column_classes(new_field.rel.through)
|
||||||
# Remove the M2M table again
|
self.assertEqual(columns['tag_id'][0], "IntegerField")
|
||||||
editor = connection.schema_editor()
|
# Remove the M2M table again
|
||||||
editor.start()
|
editor = connection.schema_editor()
|
||||||
editor.delete_field(
|
editor.start()
|
||||||
Author,
|
editor.delete_field(
|
||||||
new_field,
|
Author,
|
||||||
)
|
new_field,
|
||||||
editor.commit()
|
)
|
||||||
# Ensure there's no m2m table there
|
editor.commit()
|
||||||
self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through)
|
# Ensure there's no m2m table there
|
||||||
connection.rollback()
|
self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through)
|
||||||
|
connection.rollback()
|
||||||
|
finally:
|
||||||
|
# Cleanup model states
|
||||||
|
AuthorWithM2M._meta.local_many_to_many.remove(new_field)
|
||||||
|
del AuthorWithM2M._meta._m2m_cache
|
||||||
|
|
||||||
def test_m2m_repoint(self):
|
def test_m2m_repoint(self):
|
||||||
"""
|
"""
|
||||||
|
@ -330,26 +333,31 @@ class SchemaTests(TestCase):
|
||||||
# Repoint the M2M
|
# Repoint the M2M
|
||||||
new_field = ManyToManyField(UniqueTest)
|
new_field = ManyToManyField(UniqueTest)
|
||||||
new_field.contribute_to_class(BookWithM2M, "uniques")
|
new_field.contribute_to_class(BookWithM2M, "uniques")
|
||||||
editor = connection.schema_editor()
|
try:
|
||||||
editor.start()
|
editor = connection.schema_editor()
|
||||||
editor.alter_field(
|
editor.start()
|
||||||
Author,
|
editor.alter_field(
|
||||||
BookWithM2M._meta.get_field_by_name("tags")[0],
|
Author,
|
||||||
new_field,
|
BookWithM2M._meta.get_field_by_name("tags")[0],
|
||||||
)
|
new_field,
|
||||||
editor.commit()
|
)
|
||||||
# Ensure old M2M is gone
|
editor.commit()
|
||||||
self.assertRaises(DatabaseError, self.column_classes, BookWithM2M._meta.get_field_by_name("tags")[0].rel.through)
|
# Ensure old M2M is gone
|
||||||
connection.rollback()
|
self.assertRaises(DatabaseError, self.column_classes, BookWithM2M._meta.get_field_by_name("tags")[0].rel.through)
|
||||||
# Ensure the new M2M exists and points to UniqueTest
|
connection.rollback()
|
||||||
constraints = connection.introspection.get_constraints(connection.cursor(), new_field.rel.through._meta.db_table)
|
# Ensure the new M2M exists and points to UniqueTest
|
||||||
if connection.features.supports_foreign_keys:
|
constraints = connection.introspection.get_constraints(connection.cursor(), new_field.rel.through._meta.db_table)
|
||||||
for name, details in constraints.items():
|
if connection.features.supports_foreign_keys:
|
||||||
if details['columns'] == set(["uniquetest_id"]) and details['foreign_key']:
|
for name, details in constraints.items():
|
||||||
self.assertEqual(details['foreign_key'], ('schema_uniquetest', 'id'))
|
if details['columns'] == set(["uniquetest_id"]) and details['foreign_key']:
|
||||||
break
|
self.assertEqual(details['foreign_key'], ('schema_uniquetest', 'id'))
|
||||||
else:
|
break
|
||||||
self.fail("No FK constraint for tag_id found")
|
else:
|
||||||
|
self.fail("No FK constraint for tag_id found")
|
||||||
|
finally:
|
||||||
|
# Cleanup model states
|
||||||
|
BookWithM2M._meta.local_many_to_many.remove(new_field)
|
||||||
|
del BookWithM2M._meta._m2m_cache
|
||||||
|
|
||||||
@skipUnless(connection.features.supports_check_constraints, "No check constraints")
|
@skipUnless(connection.features.supports_check_constraints, "No check constraints")
|
||||||
def test_check_constraints(self):
|
def test_check_constraints(self):
|
||||||
|
|
Loading…
Reference in New Issue