import os from django.db import connection from django.db.migrations.recorder import MigrationRecorder from django.test import TransactionTestCase from django.utils._os import upath class MigrationTestBase(TransactionTestCase): """ Contains an extended set of asserts for testing migrations and schema operations. """ available_apps = ["migrations"] test_dir = os.path.abspath(os.path.dirname(upath(__file__))) def tearDown(self): # Reset applied-migrations state. recorder = MigrationRecorder(connection) recorder.migration_qs.filter(app='migrations').delete() def get_table_description(self, table): with connection.cursor() as cursor: return connection.introspection.get_table_description(cursor, table) def assertTableExists(self, table): with connection.cursor() as cursor: self.assertIn(table, connection.introspection.table_names(cursor)) def assertTableNotExists(self, table): with connection.cursor() as cursor: self.assertNotIn(table, connection.introspection.table_names(cursor)) def assertColumnExists(self, table, column): self.assertIn(column, [c.name for c in self.get_table_description(table)]) def assertColumnNotExists(self, table, column): self.assertNotIn(column, [c.name for c in self.get_table_description(table)]) def assertColumnNull(self, table, column): self.assertEqual([c.null_ok for c in self.get_table_description(table) if c.name == column][0], True) def assertColumnNotNull(self, table, column): self.assertEqual([c.null_ok for c in self.get_table_description(table) if c.name == column][0], False) def assertIndexExists(self, table, columns, value=True): with connection.cursor() as cursor: self.assertEqual( value, any( c["index"] for c in connection.introspection.get_constraints(cursor, table).values() if c['columns'] == list(columns) ), ) def assertIndexNotExists(self, table, columns): return self.assertIndexExists(table, columns, False) def assertFKExists(self, table, columns, to, value=True): with connection.cursor() as cursor: self.assertEqual( value, any( c["foreign_key"] == to for c in connection.introspection.get_constraints(cursor, table).values() if c['columns'] == list(columns) ), ) def assertFKNotExists(self, table, columns, to, value=True): return self.assertFKExists(table, columns, to, False)