from __future__ import unicode_literals import unittest try: import sqlparse except ImportError: sqlparse = None from django import test from django.db import connection, migrations, models, router from django.db.migrations.migration import Migration from django.db.migrations.state import ProjectState from django.db.models.fields import NOT_PROVIDED from django.db.transaction import atomic from django.db.utils import IntegrityError, DatabaseError from .test_base import MigrationTestBase class OperationTests(MigrationTestBase): """ Tests running the operations and making sure they do what they say they do. Each test looks at their state changing, and then their database operation - both forwards and backwards. """ def apply_operations(self, app_label, project_state, operations): migration = Migration('name', app_label) migration.operations = operations with connection.schema_editor() as editor: return migration.apply(project_state, editor) def unapply_operations(self, app_label, project_state, operations): migration = Migration('name', app_label) migration.operations = operations with connection.schema_editor() as editor: return migration.unapply(project_state, editor) def set_up_test_model(self, app_label, second_model=False, third_model=False, related_model=False, mti_model=False): """ Creates a test model state and database table. """ # Delete the tables if they already exist with connection.cursor() as cursor: # Start with ManyToMany tables try: cursor.execute("DROP TABLE %s_pony_stables" % app_label) except DatabaseError: pass try: cursor.execute("DROP TABLE %s_pony_vans" % app_label) except DatabaseError: pass # Then standard model tables try: cursor.execute("DROP TABLE %s_pony" % app_label) except DatabaseError: pass try: cursor.execute("DROP TABLE %s_stable" % app_label) except DatabaseError: pass try: cursor.execute("DROP TABLE %s_van" % app_label) except DatabaseError: pass # Make the "current" state operations = [migrations.CreateModel( "Pony", [ ("id", models.AutoField(primary_key=True)), ("pink", models.IntegerField(default=3)), ("weight", models.FloatField()), ], )] if second_model: operations.append(migrations.CreateModel( "Stable", [ ("id", models.AutoField(primary_key=True)), ] )) if third_model: operations.append(migrations.CreateModel( "Van", [ ("id", models.AutoField(primary_key=True)), ] )) if related_model: operations.append(migrations.CreateModel( "Rider", [ ("id", models.AutoField(primary_key=True)), ("pony", models.ForeignKey("Pony")), ], )) if mti_model: operations.append(migrations.CreateModel( "ShetlandPony", fields=[ ('pony_ptr', models.OneToOneField( auto_created=True, primary_key=True, to_field='id', serialize=False, to='Pony', )), ("cuteness", models.IntegerField(default=1)), ], bases=['%s.Pony' % app_label], )) return self.apply_operations(app_label, ProjectState(), operations) def test_create_model(self): """ Tests the CreateModel operation. Most other tests use this operation as part of setup, so check failures here first. """ operation = migrations.CreateModel( "Pony", [ ("id", models.AutoField(primary_key=True)), ("pink", models.IntegerField(default=1)), ], ) # Test the state alteration project_state = ProjectState() new_state = project_state.clone() operation.state_forwards("test_crmo", new_state) self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony") self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2) # Test the database alteration self.assertTableNotExists("test_crmo_pony") with connection.schema_editor() as editor: operation.database_forwards("test_crmo", editor, project_state, new_state) self.assertTableExists("test_crmo_pony") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_crmo", editor, new_state, project_state) self.assertTableNotExists("test_crmo_pony") # And deconstruction definition = operation.deconstruct() self.assertEqual(definition[0], "CreateModel") self.assertEqual(len(definition[1]), 2) self.assertEqual(len(definition[2]), 0) self.assertEqual(definition[1][0], "Pony") def test_create_model_m2m(self): """ Test the creation of a model with a ManyToMany field and the auto-created "through" model. """ project_state = self.set_up_test_model("test_crmomm") operation = migrations.CreateModel( "Stable", [ ("id", models.AutoField(primary_key=True)), ("ponies", models.ManyToManyField("Pony", related_name="stables")) ] ) # Test the state alteration new_state = project_state.clone() operation.state_forwards("test_crmomm", new_state) # Test the database alteration self.assertTableNotExists("test_crmomm_stable_ponies") with connection.schema_editor() as editor: operation.database_forwards("test_crmomm", editor, project_state, new_state) self.assertTableExists("test_crmomm_stable") self.assertTableExists("test_crmomm_stable_ponies") self.assertColumnNotExists("test_crmomm_stable", "ponies") # Make sure the M2M field actually works with atomic(): new_apps = new_state.render() Pony = new_apps.get_model("test_crmomm", "Pony") Stable = new_apps.get_model("test_crmomm", "Stable") stable = Stable.objects.create() p1 = Pony.objects.create(pink=False, weight=4.55) p2 = Pony.objects.create(pink=True, weight=5.43) stable.ponies.add(p1, p2) self.assertEqual(stable.ponies.count(), 2) stable.ponies.all().delete() # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_crmomm", editor, new_state, project_state) self.assertTableNotExists("test_crmomm_stable") self.assertTableNotExists("test_crmomm_stable_ponies") def test_create_model_inheritance(self): """ Tests the CreateModel operation on a multi-table inheritance setup. """ project_state = self.set_up_test_model("test_crmoih") # Test the state alteration operation = migrations.CreateModel( "ShetlandPony", [ ('pony_ptr', models.OneToOneField( auto_created=True, primary_key=True, to_field='id', serialize=False, to='test_crmoih.Pony', )), ("cuteness", models.IntegerField(default=1)), ], ) new_state = project_state.clone() operation.state_forwards("test_crmoih", new_state) self.assertIn(("test_crmoih", "shetlandpony"), new_state.models) # Test the database alteration self.assertTableNotExists("test_crmoih_shetlandpony") with connection.schema_editor() as editor: operation.database_forwards("test_crmoih", editor, project_state, new_state) self.assertTableExists("test_crmoih_shetlandpony") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_crmoih", editor, new_state, project_state) self.assertTableNotExists("test_crmoih_shetlandpony") def test_delete_model(self): """ Tests the DeleteModel operation. """ project_state = self.set_up_test_model("test_dlmo") # Test the state alteration operation = migrations.DeleteModel("Pony") new_state = project_state.clone() operation.state_forwards("test_dlmo", new_state) self.assertNotIn(("test_dlmo", "pony"), new_state.models) # Test the database alteration self.assertTableExists("test_dlmo_pony") with connection.schema_editor() as editor: operation.database_forwards("test_dlmo", editor, project_state, new_state) self.assertTableNotExists("test_dlmo_pony") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_dlmo", editor, new_state, project_state) self.assertTableExists("test_dlmo_pony") def test_rename_model(self): """ Tests the RenameModel operation. """ project_state = self.set_up_test_model("test_rnmo", related_model=True) # Test the state alteration operation = migrations.RenameModel("Pony", "Horse") new_state = project_state.clone() operation.state_forwards("test_rnmo", new_state) self.assertNotIn(("test_rnmo", "pony"), new_state.models) self.assertIn(("test_rnmo", "horse"), new_state.models) # Remember, RenameModel also repoints all incoming FKs and M2Ms self.assertEqual("test_rnmo.Horse", new_state.models["test_rnmo", "rider"].fields[1][1].rel.to) # Test the database alteration self.assertTableExists("test_rnmo_pony") self.assertTableNotExists("test_rnmo_horse") if connection.features.supports_foreign_keys: self.assertFKExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id")) self.assertFKNotExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id")) with connection.schema_editor() as editor: operation.database_forwards("test_rnmo", editor, project_state, new_state) self.assertTableNotExists("test_rnmo_pony") self.assertTableExists("test_rnmo_horse") if connection.features.supports_foreign_keys: self.assertFKNotExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id")) self.assertFKExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id")) # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_rnmo", editor, new_state, project_state) self.assertTableExists("test_rnmo_pony") self.assertTableNotExists("test_rnmo_horse") if connection.features.supports_foreign_keys: self.assertFKExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id")) self.assertFKNotExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id")) def test_add_field(self): """ Tests the AddField operation. """ project_state = self.set_up_test_model("test_adfl") # Test the state alteration operation = migrations.AddField( "Pony", "height", models.FloatField(null=True, default=5), ) new_state = project_state.clone() operation.state_forwards("test_adfl", new_state) self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4) field = [ f for n, f in new_state.models["test_adfl", "pony"].fields if n == "height" ][0] self.assertEqual(field.default, 5) # Test the database alteration self.assertColumnNotExists("test_adfl_pony", "height") with connection.schema_editor() as editor: operation.database_forwards("test_adfl", editor, project_state, new_state) self.assertColumnExists("test_adfl_pony", "height") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_adfl", editor, new_state, project_state) self.assertColumnNotExists("test_adfl_pony", "height") def test_add_charfield(self): """ Tests the AddField operation on TextField. """ project_state = self.set_up_test_model("test_adchfl") new_apps = project_state.render() Pony = new_apps.get_model("test_adchfl", "Pony") pony = Pony.objects.create(weight=42) new_state = self.apply_operations("test_adchfl", project_state, [ migrations.AddField( "Pony", "text", models.CharField(max_length=10, default="some text"), ), migrations.AddField( "Pony", "empty", models.CharField(max_length=10, default=""), ), # If not properly quoted digits would be interpreted as an int. migrations.AddField( "Pony", "digits", models.CharField(max_length=10, default="42"), ), # Manual quoting is fragile and could trip on quotes. Refs #xyz. migrations.AddField( "Pony", "quotes", models.CharField(max_length=10, default='"\'"'), ), ]) new_apps = new_state.render() Pony = new_apps.get_model("test_adchfl", "Pony") pony = Pony.objects.get(pk=pony.pk) self.assertEqual(pony.text, "some text") self.assertEqual(pony.empty, "") self.assertEqual(pony.digits, "42") self.assertEqual(pony.quotes, '"\'"') def test_add_textfield(self): """ Tests the AddField operation on TextField. """ project_state = self.set_up_test_model("test_adtxtfl") new_apps = project_state.render() Pony = new_apps.get_model("test_adtxtfl", "Pony") pony = Pony.objects.create(weight=42) new_state = self.apply_operations("test_adtxtfl", project_state, [ migrations.AddField( "Pony", "text", models.TextField(default="some text"), ), migrations.AddField( "Pony", "empty", models.TextField(default=""), ), # If not properly quoted digits would be interpreted as an int. migrations.AddField( "Pony", "digits", models.TextField(default="42"), ), # Manual quoting is fragile and could trip on quotes. Refs #xyz. migrations.AddField( "Pony", "quotes", models.TextField(default='"\'"'), ), ]) new_apps = new_state.render() Pony = new_apps.get_model("test_adtxtfl", "Pony") pony = Pony.objects.get(pk=pony.pk) self.assertEqual(pony.text, "some text") self.assertEqual(pony.empty, "") self.assertEqual(pony.digits, "42") self.assertEqual(pony.quotes, '"\'"') @test.skipUnlessDBFeature('supports_binary_field') def test_add_binaryfield(self): """ Tests the AddField operation on TextField/BinaryField. """ project_state = self.set_up_test_model("test_adbinfl") new_apps = project_state.render() Pony = new_apps.get_model("test_adbinfl", "Pony") pony = Pony.objects.create(weight=42) new_state = self.apply_operations("test_adbinfl", project_state, [ migrations.AddField( "Pony", "blob", models.BinaryField(default=b"some text"), ), migrations.AddField( "Pony", "empty", models.BinaryField(default=b""), ), # If not properly quoted digits would be interpreted as an int. migrations.AddField( "Pony", "digits", models.BinaryField(default=b"42"), ), # Manual quoting is fragile and could trip on quotes. Refs #xyz. migrations.AddField( "Pony", "quotes", models.BinaryField(default=b'"\'"'), ), ]) new_apps = new_state.render() Pony = new_apps.get_model("test_adbinfl", "Pony") pony = Pony.objects.get(pk=pony.pk) # SQLite returns buffer/memoryview, cast to bytes for checking. self.assertEqual(bytes(pony.blob), b"some text") self.assertEqual(bytes(pony.empty), b"") self.assertEqual(bytes(pony.digits), b"42") self.assertEqual(bytes(pony.quotes), b'"\'"') def test_column_name_quoting(self): """ Column names that are SQL keywords shouldn't cause problems when used in migrations (#22168). """ project_state = self.set_up_test_model("test_regr22168") operation = migrations.AddField( "Pony", "order", models.IntegerField(default=0), ) new_state = project_state.clone() operation.state_forwards("test_regr22168", new_state) with connection.schema_editor() as editor: operation.database_forwards("test_regr22168", editor, project_state, new_state) self.assertColumnExists("test_regr22168_pony", "order") def test_add_field_preserve_default(self): """ Tests the AddField operation's state alteration when preserve_default = False. """ project_state = self.set_up_test_model("test_adflpd") # Test the state alteration operation = migrations.AddField( "Pony", "height", models.FloatField(null=True, default=4), preserve_default=False, ) new_state = project_state.clone() operation.state_forwards("test_adflpd", new_state) self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4) field = [ f for n, f in new_state.models["test_adflpd", "pony"].fields if n == "height" ][0] self.assertEqual(field.default, NOT_PROVIDED) # Test the database alteration project_state.render().get_model("test_adflpd", "pony").objects.create( weight=4, ) self.assertColumnNotExists("test_adflpd_pony", "height") with connection.schema_editor() as editor: operation.database_forwards("test_adflpd", editor, project_state, new_state) self.assertColumnExists("test_adflpd_pony", "height") def test_add_field_m2m(self): """ Tests the AddField operation with a ManyToManyField. """ project_state = self.set_up_test_model("test_adflmm", second_model=True) # Test the state alteration operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies")) new_state = project_state.clone() operation.state_forwards("test_adflmm", new_state) self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4) # Test the database alteration self.assertTableNotExists("test_adflmm_pony_stables") with connection.schema_editor() as editor: operation.database_forwards("test_adflmm", editor, project_state, new_state) self.assertTableExists("test_adflmm_pony_stables") self.assertColumnNotExists("test_adflmm_pony", "stables") # Make sure the M2M field actually works with atomic(): new_apps = new_state.render() Pony = new_apps.get_model("test_adflmm", "Pony") p = Pony.objects.create(pink=False, weight=4.55) p.stables.create() self.assertEqual(p.stables.count(), 1) p.stables.all().delete() # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_adflmm", editor, new_state, project_state) self.assertTableNotExists("test_adflmm_pony_stables") def test_alter_field_m2m(self): project_state = self.set_up_test_model("test_alflmm", second_model=True) project_state = self.apply_operations("test_alflmm", project_state, operations=[ migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies")) ]) new_apps = project_state.render() Pony = new_apps.get_model("test_alflmm", "Pony") self.assertFalse(Pony._meta.get_field('stables').blank) project_state = self.apply_operations("test_alflmm", project_state, operations=[ migrations.AlterField("Pony", "stables", models.ManyToManyField(to="Stable", related_name="ponies", blank=True)) ]) new_apps = project_state.render() Pony = new_apps.get_model("test_alflmm", "Pony") self.assertTrue(Pony._meta.get_field('stables').blank) def test_repoint_field_m2m(self): project_state = self.set_up_test_model("test_alflmm", second_model=True, third_model=True) project_state = self.apply_operations("test_alflmm", project_state, operations=[ migrations.AddField("Pony", "places", models.ManyToManyField("Stable", related_name="ponies")) ]) new_apps = project_state.render() Pony = new_apps.get_model("test_alflmm", "Pony") project_state = self.apply_operations("test_alflmm", project_state, operations=[ migrations.AlterField("Pony", "places", models.ManyToManyField(to="Van", related_name="ponies")) ]) # Ensure the new field actually works new_apps = project_state.render() Pony = new_apps.get_model("test_alflmm", "Pony") p = Pony.objects.create(pink=False, weight=4.55) p.places.create() self.assertEqual(p.places.count(), 1) p.places.all().delete() def test_remove_field_m2m(self): project_state = self.set_up_test_model("test_rmflmm", second_model=True) project_state = self.apply_operations("test_rmflmm", project_state, operations=[ migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies")) ]) self.assertTableExists("test_rmflmm_pony_stables") operations = [migrations.RemoveField("Pony", "stables")] self.apply_operations("test_rmflmm", project_state, operations=operations) self.assertTableNotExists("test_rmflmm_pony_stables") # And test reversal self.unapply_operations("test_rmflmm", project_state, operations=operations) self.assertTableExists("test_rmflmm_pony_stables") def test_remove_field_m2m_with_through(self): project_state = self.set_up_test_model("test_rmflmmwt", second_model=True) self.assertTableNotExists("test_rmflmmwt_ponystables") project_state = self.apply_operations("test_rmflmmwt", project_state, operations=[ migrations.CreateModel("PonyStables", fields=[ ("pony", models.ForeignKey('test_rmflmmwt.Pony')), ("stable", models.ForeignKey('test_rmflmmwt.Stable')), ]), migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies", through='test_rmflmmwt.PonyStables')) ]) self.assertTableExists("test_rmflmmwt_ponystables") operations = [migrations.RemoveField("Pony", "stables")] self.apply_operations("test_rmflmmwt", project_state, operations=operations) def test_remove_field(self): """ Tests the RemoveField operation. """ project_state = self.set_up_test_model("test_rmfl") # Test the state alteration operation = migrations.RemoveField("Pony", "pink") new_state = project_state.clone() operation.state_forwards("test_rmfl", new_state) self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2) # Test the database alteration self.assertColumnExists("test_rmfl_pony", "pink") with connection.schema_editor() as editor: operation.database_forwards("test_rmfl", editor, project_state, new_state) self.assertColumnNotExists("test_rmfl_pony", "pink") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_rmfl", editor, new_state, project_state) self.assertColumnExists("test_rmfl_pony", "pink") def test_remove_fk(self): """ Tests the RemoveField operation on a foreign key. """ project_state = self.set_up_test_model("test_rfk", related_model=True) self.assertColumnExists("test_rfk_rider", "pony_id") operation = migrations.RemoveField("Rider", "pony") new_state = project_state.clone() operation.state_forwards("test_rfk", new_state) with connection.schema_editor() as editor: operation.database_forwards("test_rfk", editor, project_state, new_state) self.assertColumnNotExists("test_rfk_rider", "pony_id") with connection.schema_editor() as editor: operation.database_backwards("test_rfk", editor, new_state, project_state) self.assertColumnExists("test_rfk_rider", "pony_id") def test_alter_model_table(self): """ Tests the AlterModelTable operation. """ project_state = self.set_up_test_model("test_almota") # Test the state alteration operation = migrations.AlterModelTable("Pony", "test_almota_pony_2") new_state = project_state.clone() operation.state_forwards("test_almota", new_state) self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2") # Test the database alteration self.assertTableExists("test_almota_pony") self.assertTableNotExists("test_almota_pony_2") with connection.schema_editor() as editor: operation.database_forwards("test_almota", editor, project_state, new_state) self.assertTableNotExists("test_almota_pony") self.assertTableExists("test_almota_pony_2") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_almota", editor, new_state, project_state) self.assertTableExists("test_almota_pony") self.assertTableNotExists("test_almota_pony_2") def test_alter_field(self): """ Tests the AlterField operation. """ project_state = self.set_up_test_model("test_alfl") # Test the state alteration operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True)) new_state = project_state.clone() operation.state_forwards("test_alfl", new_state) self.assertEqual(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False) self.assertEqual(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True) # Test the database alteration self.assertColumnNotNull("test_alfl_pony", "pink") with connection.schema_editor() as editor: operation.database_forwards("test_alfl", editor, project_state, new_state) self.assertColumnNull("test_alfl_pony", "pink") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_alfl", editor, new_state, project_state) self.assertColumnNotNull("test_alfl_pony", "pink") def test_alter_field_pk(self): """ Tests the AlterField operation on primary keys (for things like PostgreSQL's SERIAL weirdness) """ project_state = self.set_up_test_model("test_alflpk") # Test the state alteration operation = migrations.AlterField("Pony", "id", models.IntegerField(primary_key=True)) new_state = project_state.clone() operation.state_forwards("test_alflpk", new_state) self.assertIsInstance(project_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.AutoField) self.assertIsInstance(new_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.IntegerField) # Test the database alteration with connection.schema_editor() as editor: operation.database_forwards("test_alflpk", editor, project_state, new_state) # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_alflpk", editor, new_state, project_state) @unittest.skipUnless(connection.features.supports_foreign_keys, "No FK support") def test_alter_field_pk_fk(self): """ Tests the AlterField operation on primary keys changes any FKs pointing to it. """ project_state = self.set_up_test_model("test_alflpkfk", related_model=True) # Test the state alteration operation = migrations.AlterField("Pony", "id", models.FloatField(primary_key=True)) new_state = project_state.clone() operation.state_forwards("test_alflpkfk", new_state) self.assertIsInstance(project_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.AutoField) self.assertIsInstance(new_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.FloatField) def assertIdTypeEqualsFkType(): with connection.cursor() as cursor: id_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_pony") if c.name == "id"][0] fk_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_rider") if c.name == "pony_id"][0] self.assertEqual(id_type, fk_type) assertIdTypeEqualsFkType() # Test the database alteration with connection.schema_editor() as editor: operation.database_forwards("test_alflpkfk", editor, project_state, new_state) assertIdTypeEqualsFkType() # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_alflpkfk", editor, new_state, project_state) assertIdTypeEqualsFkType() def test_rename_field(self): """ Tests the RenameField operation. """ project_state = self.set_up_test_model("test_rnfl") # Test the state alteration operation = migrations.RenameField("Pony", "pink", "blue") new_state = project_state.clone() operation.state_forwards("test_rnfl", new_state) self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields]) self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields]) # Test the database alteration self.assertColumnExists("test_rnfl_pony", "pink") self.assertColumnNotExists("test_rnfl_pony", "blue") with connection.schema_editor() as editor: operation.database_forwards("test_rnfl", editor, project_state, new_state) self.assertColumnExists("test_rnfl_pony", "blue") self.assertColumnNotExists("test_rnfl_pony", "pink") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_rnfl", editor, new_state, project_state) self.assertColumnExists("test_rnfl_pony", "pink") self.assertColumnNotExists("test_rnfl_pony", "blue") def test_alter_unique_together(self): """ Tests the AlterUniqueTogether operation. """ project_state = self.set_up_test_model("test_alunto") # Test the state alteration operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")]) new_state = project_state.clone() operation.state_forwards("test_alunto", new_state) self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0) self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1) # Make sure we can insert duplicate rows with connection.cursor() as cursor: cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)") cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)") cursor.execute("DELETE FROM test_alunto_pony") # Test the database alteration with connection.schema_editor() as editor: operation.database_forwards("test_alunto", editor, project_state, new_state) cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)") with self.assertRaises(IntegrityError): with atomic(): cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)") cursor.execute("DELETE FROM test_alunto_pony") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_alunto", editor, new_state, project_state) cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)") cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)") cursor.execute("DELETE FROM test_alunto_pony") # Test flat unique_together operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight")) operation.state_forwards("test_alunto", new_state) self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1) def test_alter_index_together(self): """ Tests the AlterIndexTogether operation. """ project_state = self.set_up_test_model("test_alinto") # Test the state alteration operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")]) new_state = project_state.clone() operation.state_forwards("test_alinto", new_state) self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0) self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1) # Make sure there's no matching index self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"]) # Test the database alteration with connection.schema_editor() as editor: operation.database_forwards("test_alinto", editor, project_state, new_state) self.assertIndexExists("test_alinto_pony", ["pink", "weight"]) # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_alinto", editor, new_state, project_state) self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"]) @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse") def test_run_sql(self): """ Tests the RunSQL operation. """ project_state = self.set_up_test_model("test_runsql") # Create the operation operation = migrations.RunSQL( # Use a multi-line string with a commment to test splitting on SQLite and MySQL respectively "CREATE TABLE i_love_ponies (id int, special_thing int);\n" "INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 42); -- this is magic!\n" "INSERT INTO i_love_ponies (id, special_thing) VALUES (2, 51);\n", "DROP TABLE i_love_ponies", state_operations=[migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])], ) # Test the state alteration new_state = project_state.clone() operation.state_forwards("test_runsql", new_state) self.assertEqual(len(new_state.models["test_runsql", "somethingelse"].fields), 1) # Make sure there's no table self.assertTableNotExists("i_love_ponies") # Test the database alteration with connection.schema_editor() as editor: operation.database_forwards("test_runsql", editor, project_state, new_state) self.assertTableExists("i_love_ponies") # Make sure all the SQL was processed with connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM i_love_ponies") self.assertEqual(cursor.fetchall()[0][0], 2) # And test reversal self.assertTrue(operation.reversible) with connection.schema_editor() as editor: operation.database_backwards("test_runsql", editor, new_state, project_state) self.assertTableNotExists("i_love_ponies") def test_run_python(self): """ Tests the RunPython operation """ project_state = self.set_up_test_model("test_runpython", mti_model=True) # Create the operation def inner_method(models, schema_editor): Pony = models.get_model("test_runpython", "Pony") Pony.objects.create(pink=1, weight=3.55) Pony.objects.create(weight=5) def inner_method_reverse(models, schema_editor): Pony = models.get_model("test_runpython", "Pony") Pony.objects.filter(pink=1, weight=3.55).delete() Pony.objects.filter(weight=5).delete() operation = migrations.RunPython(inner_method, reverse_code=inner_method_reverse) # Test the state alteration does nothing new_state = project_state.clone() operation.state_forwards("test_runpython", new_state) self.assertEqual(new_state, project_state) # Test the database alteration self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0) with connection.schema_editor() as editor: operation.database_forwards("test_runpython", editor, project_state, new_state) self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2) # Now test reversal self.assertTrue(operation.reversible) with connection.schema_editor() as editor: operation.database_backwards("test_runpython", editor, project_state, new_state) self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0) # Now test we can't use a string with self.assertRaises(ValueError): operation = migrations.RunPython("print 'ahahaha'") # Also test reversal fails, with an operation identical to above but without reverse_code set no_reverse_operation = migrations.RunPython(inner_method) self.assertFalse(no_reverse_operation.reversible) with connection.schema_editor() as editor: no_reverse_operation.database_forwards("test_runpython", editor, project_state, new_state) with self.assertRaises(NotImplementedError): no_reverse_operation.database_backwards("test_runpython", editor, new_state, project_state) self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2) def create_ponies(models, schema_editor): Pony = models.get_model("test_runpython", "Pony") pony1 = Pony.objects.create(pink=1, weight=3.55) self.assertIsNot(pony1.pk, None) pony2 = Pony.objects.create(weight=5) self.assertIsNot(pony2.pk, None) self.assertNotEqual(pony1.pk, pony2.pk) operation = migrations.RunPython(create_ponies) with connection.schema_editor() as editor: operation.database_forwards("test_runpython", editor, project_state, new_state) self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 4) def create_shetlandponies(models, schema_editor): ShetlandPony = models.get_model("test_runpython", "ShetlandPony") pony1 = ShetlandPony.objects.create(weight=4.0) self.assertIsNot(pony1.pk, None) pony2 = ShetlandPony.objects.create(weight=5.0) self.assertIsNot(pony2.pk, None) self.assertNotEqual(pony1.pk, pony2.pk) operation = migrations.RunPython(create_shetlandponies) with connection.schema_editor() as editor: operation.database_forwards("test_runpython", editor, project_state, new_state) self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 6) self.assertEqual(project_state.render().get_model("test_runpython", "ShetlandPony").objects.count(), 2) def test_run_python_atomic(self): """ Tests the RunPython operation correctly handles the "atomic" keyword """ project_state = self.set_up_test_model("test_runpythonatomic", mti_model=True) def inner_method(models, schema_editor): Pony = models.get_model("test_runpythonatomic", "Pony") Pony.objects.create(pink=1, weight=3.55) raise ValueError("Adrian hates ponies.") atomic_migration = Migration("test", "test_runpythonatomic") atomic_migration.operations = [migrations.RunPython(inner_method)] non_atomic_migration = Migration("test", "test_runpythonatomic") non_atomic_migration.operations = [migrations.RunPython(inner_method, atomic=False)] # If we're a fully-transactional database, both versions should rollback if connection.features.can_rollback_ddl: self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0) with self.assertRaises(ValueError): with connection.schema_editor() as editor: atomic_migration.apply(project_state, editor) self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0) with self.assertRaises(ValueError): with connection.schema_editor() as editor: non_atomic_migration.apply(project_state, editor) self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0) # Otherwise, the non-atomic operation should leave a row there else: self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0) with self.assertRaises(ValueError): with connection.schema_editor() as editor: atomic_migration.apply(project_state, editor) self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0) with self.assertRaises(ValueError): with connection.schema_editor() as editor: non_atomic_migration.apply(project_state, editor) self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 1) class MigrateNothingRouter(object): """ A router that sends all writes to the other database. """ def allow_migrate(self, db, model): return False class MultiDBOperationTests(MigrationTestBase): multi_db = True def setUp(self): # Make the 'other' database appear to be a replica of the 'default' self.old_routers = router.routers router.routers = [MigrateNothingRouter()] def tearDown(self): # Restore the 'other' database as an independent database router.routers = self.old_routers def test_create_model(self): """ Tests that CreateModel honours multi-db settings. """ operation = migrations.CreateModel( "Pony", [ ("id", models.AutoField(primary_key=True)), ("pink", models.IntegerField(default=1)), ], ) # Test the state alteration project_state = ProjectState() new_state = project_state.clone() operation.state_forwards("test_crmo", new_state) # Test the database alteration self.assertTableNotExists("test_crmo_pony") with connection.schema_editor() as editor: operation.database_forwards("test_crmo", editor, project_state, new_state) self.assertTableNotExists("test_crmo_pony") # And test reversal with connection.schema_editor() as editor: operation.database_backwards("test_crmo", editor, new_state, project_state) self.assertTableNotExists("test_crmo_pony")