Add tests for the migrate command and fix a bug they exposed

This commit is contained in:
Andrew Godwin 2013-07-25 13:52:35 +01:00
parent 162f7b938f
commit 00276e0414
6 changed files with 84 additions and 37 deletions

View File

@ -73,7 +73,7 @@ class Command(BaseCommand):
if app_label not in executor.loader.migrated_apps: if app_label not in executor.loader.migrated_apps:
raise CommandError("App '%s' does not have migrations (you cannot selectively sync unmigrated apps)" % app_label) raise CommandError("App '%s' does not have migrations (you cannot selectively sync unmigrated apps)" % app_label)
if migration_name == "zero": if migration_name == "zero":
migration_name = None targets = [(app_label, None)]
else: else:
try: try:
migration = executor.loader.get_migration_by_prefix(app_label, migration_name) migration = executor.loader.get_migration_by_prefix(app_label, migration_name)
@ -81,7 +81,7 @@ class Command(BaseCommand):
raise CommandError("More than one migration matches '%s' in app '%s'. Please be more specific." % (app_label, migration_name)) raise CommandError("More than one migration matches '%s' in app '%s'. Please be more specific." % (app_label, migration_name))
except KeyError: except KeyError:
raise CommandError("Cannot find a migration matching '%s' from app '%s'. Is it in INSTALLED_APPS?" % (app_label, migration_name)) raise CommandError("Cannot find a migration matching '%s' from app '%s'. Is it in INSTALLED_APPS?" % (app_label, migration_name))
targets = [(app_label, migration.name)] targets = [(app_label, migration.name)]
target_app_labels_only = False target_app_labels_only = False
elif len(args) == 1: elif len(args) == 1:
app_label = args[0] app_label = args[0]
@ -110,7 +110,8 @@ class Command(BaseCommand):
# Run the syncdb phase. # Run the syncdb phase.
# If you ever manage to get rid of this, I owe you many, many drinks. # If you ever manage to get rid of this, I owe you many, many drinks.
if run_syncdb: if run_syncdb:
self.stdout.write(self.style.MIGRATE_HEADING("Synchronizing apps without migrations:")) if self.verbosity >= 1:
self.stdout.write(self.style.MIGRATE_HEADING("Synchronizing apps without migrations:"))
self.sync_apps(connection, executor.loader.unmigrated_apps) self.sync_apps(connection, executor.loader.unmigrated_apps)
# Migrate! # Migrate!

View File

@ -0,0 +1,39 @@
from django.test import TestCase
from django.db import connection
class MigrationTestBase(TestCase):
"""
Contains an extended set of asserts for testing migrations and schema operations.
"""
def assertTableExists(self, table):
self.assertIn(table, connection.introspection.get_table_list(connection.cursor()))
def assertTableNotExists(self, table):
self.assertNotIn(table, connection.introspection.get_table_list(connection.cursor()))
def assertColumnExists(self, table, column):
self.assertIn(column, [c.name for c in connection.introspection.get_table_description(connection.cursor(), table)])
def assertColumnNotExists(self, table, column):
self.assertNotIn(column, [c.name for c in connection.introspection.get_table_description(connection.cursor(), table)])
def assertColumnNull(self, table, column):
self.assertEqual([c.null_ok for c in connection.introspection.get_table_description(connection.cursor(), table) if c.name == column][0], True)
def assertColumnNotNull(self, table, column):
self.assertEqual([c.null_ok for c in connection.introspection.get_table_description(connection.cursor(), table) if c.name == column][0], False)
def assertIndexExists(self, table, columns, value=True):
self.assertEqual(
value,
any(
c["index"]
for c in connection.introspection.get_constraints(connection.cursor(), table).values()
if c['columns'] == list(columns)
),
)
def assertIndexNotExists(self, table, columns):
return self.assertIndexExists(table, columns, False)

View File

@ -0,0 +1,37 @@
from django.core.management import call_command
from django.test.utils import override_settings
from .test_base import MigrationTestBase
class CommandTests(MigrationTestBase):
"""
Tests running the commands (migrate, makemigrations).
"""
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
def test_migrate(self):
"""
Tests basic usage of the migrate command.
"""
# Make sure no tables are created
self.assertTableNotExists("migrations_author")
self.assertTableNotExists("migrations_tribble")
self.assertTableNotExists("migrations_book")
# Run the migrations to 0001 only
call_command("migrate", "migrations", "0001", verbosity=0)
# Make sure the right tables exist
self.assertTableExists("migrations_author")
self.assertTableExists("migrations_tribble")
self.assertTableNotExists("migrations_book")
# Run migrations all the way
call_command("migrate", verbosity=0)
# Make sure the right tables exist
self.assertTableExists("migrations_author")
self.assertTableNotExists("migrations_tribble")
self.assertTableExists("migrations_book")
# Unmigrate everything
call_command("migrate", "migrations", "zero", verbosity=0)
# Make sure it's all gone
self.assertTableNotExists("migrations_author")
self.assertTableNotExists("migrations_tribble")
self.assertTableNotExists("migrations_book")

View File

@ -20,6 +20,7 @@ class ExecutorTests(TransactionTestCase):
Tests running a simple set of migrations. Tests running a simple set of migrations.
""" """
executor = MigrationExecutor(connection) executor = MigrationExecutor(connection)
executor.recorder.flush()
# Let's look at the plan first and make sure it's up to scratch # Let's look at the plan first and make sure it's up to scratch
plan = executor.migration_plan([("migrations", "0002_second")]) plan = executor.migration_plan([("migrations", "0002_second")])
self.assertEqual( self.assertEqual(

View File

@ -12,7 +12,7 @@ class Migration(migrations.Migration):
("name", models.CharField(max_length=255)), ("name", models.CharField(max_length=255)),
("slug", models.SlugField(null=True)), ("slug", models.SlugField(null=True)),
("age", models.IntegerField(default=0)), ("age", models.IntegerField(default=0)),
("silly_field", models.BooleanField()), ("silly_field", models.BooleanField(default=False)),
], ],
), ),

View File

@ -1,48 +1,17 @@
from django.test import TestCase
from django.db import connection, models, migrations from django.db import connection, models, migrations
from django.db.transaction import atomic from django.db.transaction import atomic
from django.db.utils import IntegrityError from django.db.utils import IntegrityError
from django.db.migrations.state import ProjectState from django.db.migrations.state import ProjectState
from .test_base import MigrationTestBase
class OperationTests(TestCase): class OperationTests(MigrationTestBase):
""" """
Tests running the operations and making sure they do what they say they do. Tests running the operations and making sure they do what they say they do.
Each test looks at their state changing, and then their database operation - Each test looks at their state changing, and then their database operation -
both forwards and backwards. both forwards and backwards.
""" """
def assertTableExists(self, table):
self.assertIn(table, connection.introspection.get_table_list(connection.cursor()))
def assertTableNotExists(self, table):
self.assertNotIn(table, connection.introspection.get_table_list(connection.cursor()))
def assertColumnExists(self, table, column):
self.assertIn(column, [c.name for c in connection.introspection.get_table_description(connection.cursor(), table)])
def assertColumnNotExists(self, table, column):
self.assertNotIn(column, [c.name for c in connection.introspection.get_table_description(connection.cursor(), table)])
def assertColumnNull(self, table, column):
self.assertEqual([c.null_ok for c in connection.introspection.get_table_description(connection.cursor(), table) if c.name == column][0], True)
def assertColumnNotNull(self, table, column):
self.assertEqual([c.null_ok for c in connection.introspection.get_table_description(connection.cursor(), table) if c.name == column][0], False)
def assertIndexExists(self, table, columns, value=True):
self.assertEqual(
value,
any(
c["index"]
for c in connection.introspection.get_constraints(connection.cursor(), table).values()
if c['columns'] == list(columns)
),
)
def assertIndexNotExists(self, table, columns):
return self.assertIndexExists(table, columns, False)
def set_up_test_model(self, app_label): def set_up_test_model(self, app_label):
""" """
Creates a test model state and database table. Creates a test model state and database table.