From 331546f6ee7f50a92c01f919e1bb4bea6ed32625 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sat, 18 May 2013 11:48:46 +0200 Subject: [PATCH] More conversion to a ContextManager schema_editor --- django/db/backends/schema.py | 33 +-- django/db/migrations/state.py | 6 + tests/schema/tests.py | 488 ++++++++++++++-------------------- 3 files changed, 215 insertions(+), 312 deletions(-) diff --git a/django/db/backends/schema.py b/django/db/backends/schema.py index 78ea80022f..d282e0898b 100644 --- a/django/db/backends/schema.py +++ b/django/db/backends/schema.py @@ -61,38 +61,19 @@ class BaseDatabaseSchemaEditor(object): # State-managing methods - def start(self): - """ - Marks the start of a schema-altering run. - """ - self.deferred_sql = [] - atomic(self.connection.alias).__enter__() - - def commit(self): - """ - Finishes a schema-altering run. - """ - for sql in self.deferred_sql: - self.execute(sql) - atomic(self.connection.alias).__exit__(None, None, None) - - def rollback(self): - """ - Tries to roll back a schema-altering run. Call instead of commit(). - """ - if not self.connection.features.can_rollback_ddl: - raise RuntimeError("Cannot rollback schema changes on this backend") - atomic(self.connection.alias).__exit__(*sys.exc_info()) - def __enter__(self): - self.start() + self.deferred_sql = [] + atomic(self.connection.alias, self.connection.features.can_rollback_ddl).__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is None: - self.commit() + for sql in self.deferred_sql: + self.execute(sql) + atomic(self.connection.alias, self.connection.features.can_rollback_ddl).__exit__(None, None, None) else: - self.rollback() + # Continue propagating exception + return None # Core utility functions diff --git a/django/db/migrations/state.py b/django/db/migrations/state.py index 3dbbbe27f8..9678026c79 100644 --- a/django/db/migrations/state.py +++ b/django/db/migrations/state.py @@ -30,6 +30,12 @@ class ProjectState(object): model.render(self.app_cache) return self.app_cache + @classmethod + def from_app_cache(cls, app_cache): + "Takes in an AppCache and returns a ProjectState matching it" + for model in app_cache.get_models(): + print model + class ModelState(object): """ diff --git a/tests/schema/tests.py b/tests/schema/tests.py index 85e1dfc9ea..752f9a5d0b 100644 --- a/tests/schema/tests.py +++ b/tests/schema/tests.py @@ -5,6 +5,7 @@ from django.utils.unittest import skipUnless from django.db import connection, DatabaseError, IntegrityError from django.db.models.fields import IntegerField, TextField, CharField, SlugField from django.db.models.fields.related import ManyToManyField, ForeignKey +from django.db.transaction import atomic from .models import Author, AuthorWithM2M, Book, BookWithSlug, BookWithM2M, Tag, TagUniqueRename, UniqueTest @@ -22,17 +23,9 @@ class SchemaTests(TransactionTestCase): # Utility functions - def setUp(self): - # Make sure we're in manual transaction mode - connection.set_autocommit(False) - def tearDown(self): # Delete any tables made for our models - connection.rollback() self.delete_tables() - # Rollback anything that may have happened - connection.rollback() - connection.set_autocommit(True) def delete_tables(self): "Deletes all model tables for our models for a clean test environment" @@ -41,29 +34,27 @@ class SchemaTests(TransactionTestCase): for model in self.models: # Remove any M2M tables first for field in model._meta.local_many_to_many: + with atomic(): + try: + cursor.execute(connection.schema_editor().sql_delete_table % { + "table": connection.ops.quote_name(field.rel.through._meta.db_table), + }) + except DatabaseError as e: + if any([s in str(e).lower() for s in self.no_table_strings]): + pass + else: + raise + # Then remove the main tables + with atomic(): try: cursor.execute(connection.schema_editor().sql_delete_table % { - "table": connection.ops.quote_name(field.rel.through._meta.db_table), + "table": connection.ops.quote_name(model._meta.db_table), }) except DatabaseError as e: if any([s in str(e).lower() for s in self.no_table_strings]): - connection.rollback() + pass else: raise - else: - connection.commit() - # Then remove the main tables - try: - cursor.execute(connection.schema_editor().sql_delete_table % { - "table": connection.ops.quote_name(model._meta.db_table), - }) - except DatabaseError as e: - if any([s in str(e).lower() for s in self.no_table_strings]): - connection.rollback() - else: - raise - else: - connection.commit() connection.enable_constraint_checking() def column_classes(self, model): @@ -91,33 +82,27 @@ class SchemaTests(TransactionTestCase): Tries creating a model's table, and then deleting it. """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) # Check that it's there list(Author.objects.all()) # Clean up that table - editor.start() - editor.delete_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.delete_model(Author) # Check that it's gone self.assertRaises( DatabaseError, lambda: list(Author.objects.all()), ) - connection.rollback() @skipUnless(connection.features.supports_foreign_keys, "No FK support") def test_fk(self): "Tests that creating tables out of FK order, then repointing, works" # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Book) - editor.create_model(Author) - editor.create_model(Tag) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Book) + editor.create_model(Author) + editor.create_model(Tag) # Check that initial tables are there list(Author.objects.all()) list(Book.objects.all()) @@ -128,19 +113,16 @@ class SchemaTests(TransactionTestCase): title = "Much Ado About Foreign Keys", pub_date = datetime.datetime.now(), ) - connection.commit() # Repoint the FK constraint new_field = ForeignKey(Tag) new_field.set_attributes_from_name("author") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Book, - Book._meta.get_field_by_name("author")[0], - new_field, - strict=True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Book, + Book._meta.get_field_by_name("author")[0], + new_field, + strict=True, + ) # Make sure the new FK constraint is present constraints = connection.introspection.get_constraints(connection.cursor(), Book._meta.db_table) for name, details in constraints.items(): @@ -155,23 +137,19 @@ class SchemaTests(TransactionTestCase): Tests adding fields to models """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) # Ensure there's no age field columns = self.column_classes(Author) self.assertNotIn("age", columns) # Alter the name field to a TextField new_field = IntegerField(null=True) new_field.set_attributes_from_name("age") - editor = connection.schema_editor() - editor.start() - editor.create_field( - Author, - new_field, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.create_field( + Author, + new_field, + ) # Ensure the field is right afterwards columns = self.column_classes(Author) self.assertEqual(columns['age'][0], "IntegerField") @@ -182,10 +160,8 @@ class SchemaTests(TransactionTestCase): Tests simple altering of fields """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) # Ensure the field is right to begin with columns = self.column_classes(Author) self.assertEqual(columns['name'][0], "CharField") @@ -193,15 +169,13 @@ class SchemaTests(TransactionTestCase): # Alter the name field to a TextField new_field = TextField(null=True) new_field.set_attributes_from_name("name") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Author, - Author._meta.get_field_by_name("name")[0], - new_field, - strict=True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Author, + Author._meta.get_field_by_name("name")[0], + new_field, + strict=True, + ) # Ensure the field is right afterwards columns = self.column_classes(Author) self.assertEqual(columns['name'][0], "TextField") @@ -209,15 +183,13 @@ class SchemaTests(TransactionTestCase): # Change nullability again new_field2 = TextField(null=False) new_field2.set_attributes_from_name("name") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Author, - new_field, - new_field2, - strict=True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Author, + new_field, + new_field2, + strict=True, + ) # Ensure the field is right afterwards columns = self.column_classes(Author) self.assertEqual(columns['name'][0], "TextField") @@ -228,10 +200,8 @@ class SchemaTests(TransactionTestCase): Tests simple altering of fields """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) # Ensure the field is right to begin with columns = self.column_classes(Author) self.assertEqual(columns['name'][0], "CharField") @@ -239,15 +209,13 @@ class SchemaTests(TransactionTestCase): # Alter the name field's name new_field = CharField(max_length=254) new_field.set_attributes_from_name("display_name") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Author, - Author._meta.get_field_by_name("name")[0], - new_field, - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Author, + Author._meta.get_field_by_name("name")[0], + new_field, + strict = True, + ) # Ensure the field is right afterwards columns = self.column_classes(Author) self.assertEqual(columns['display_name'][0], "CharField") @@ -258,12 +226,10 @@ class SchemaTests(TransactionTestCase): Tests M2M fields on models during creation """ # Create the tables - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.create_model(Tag) - editor.create_model(BookWithM2M) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) + editor.create_model(Tag) + editor.create_model(BookWithM2M) # Ensure there is now an m2m table there columns = self.column_classes(BookWithM2M._meta.get_field_by_name("tags")[0].rel.through) self.assertEqual(columns['tag_id'][0], "IntegerField") @@ -273,11 +239,9 @@ class SchemaTests(TransactionTestCase): Tests adding/removing M2M fields on models """ # Create the tables - editor = connection.schema_editor() - editor.start() - editor.create_model(AuthorWithM2M) - editor.create_model(Tag) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(AuthorWithM2M) + editor.create_model(Tag) # Create an M2M field new_field = ManyToManyField("schema.Tag", related_name="authors") new_field.contribute_to_class(AuthorWithM2M, "tags") @@ -286,24 +250,20 @@ class SchemaTests(TransactionTestCase): self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through) connection.rollback() # Add the field - editor = connection.schema_editor() - editor.start() - editor.create_field( - Author, - new_field, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.create_field( + Author, + new_field, + ) # Ensure there is now an m2m table there columns = self.column_classes(new_field.rel.through) self.assertEqual(columns['tag_id'][0], "IntegerField") # Remove the M2M table again - editor = connection.schema_editor() - editor.start() - editor.delete_field( - Author, - new_field, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.delete_field( + Author, + new_field, + ) # Ensure there's no m2m table there self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through) connection.rollback() @@ -317,13 +277,11 @@ class SchemaTests(TransactionTestCase): Tests repointing M2M fields """ # Create the tables - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.create_model(BookWithM2M) - editor.create_model(Tag) - editor.create_model(UniqueTest) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) + editor.create_model(BookWithM2M) + editor.create_model(Tag) + editor.create_model(UniqueTest) # Ensure the M2M exists and points to Tag constraints = connection.introspection.get_constraints(connection.cursor(), BookWithM2M._meta.get_field_by_name("tags")[0].rel.through._meta.db_table) if connection.features.supports_foreign_keys: @@ -337,14 +295,12 @@ class SchemaTests(TransactionTestCase): new_field = ManyToManyField(UniqueTest) new_field.contribute_to_class(BookWithM2M, "uniques") try: - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Author, - BookWithM2M._meta.get_field_by_name("tags")[0], - new_field, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Author, + BookWithM2M._meta.get_field_by_name("tags")[0], + new_field, + ) # Ensure old M2M is gone self.assertRaises(DatabaseError, self.column_classes, BookWithM2M._meta.get_field_by_name("tags")[0].rel.through) connection.rollback() @@ -368,10 +324,8 @@ class SchemaTests(TransactionTestCase): Tests creating/deleting CHECK constraints """ # Create the tables - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) # Ensure the constraint exists constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table) for name, details in constraints.items(): @@ -382,29 +336,25 @@ class SchemaTests(TransactionTestCase): # Alter the column to remove it new_field = IntegerField(null=True, blank=True) new_field.set_attributes_from_name("height") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Author, - Author._meta.get_field_by_name("height")[0], - new_field, - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Author, + Author._meta.get_field_by_name("height")[0], + new_field, + strict = True, + ) constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table) for name, details in constraints.items(): if details['columns'] == set(["height"]) and details['check']: self.fail("Check constraint for height found") # Alter the column to re-add it - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Author, - new_field, - Author._meta.get_field_by_name("height")[0], - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Author, + new_field, + Author._meta.get_field_by_name("height")[0], + strict = True, + ) constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table) for name, details in constraints.items(): if details['columns'] == set(["height"]) and details['check']: @@ -417,141 +367,121 @@ class SchemaTests(TransactionTestCase): Tests removing and adding unique constraints to a single column. """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Tag) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Tag) # Ensure the field is unique to begin with Tag.objects.create(title="foo", slug="foo") self.assertRaises(IntegrityError, Tag.objects.create, title="bar", slug="foo") - connection.rollback() + Tag.objects.all().delete() # Alter the slug field to be non-unique new_field = SlugField(unique=False) new_field.set_attributes_from_name("slug") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Tag, - Tag._meta.get_field_by_name("slug")[0], - new_field, - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Tag, + Tag._meta.get_field_by_name("slug")[0], + new_field, + strict = True, + ) # Ensure the field is no longer unique Tag.objects.create(title="foo", slug="foo") Tag.objects.create(title="bar", slug="foo") - connection.rollback() + Tag.objects.all().delete() # Alter the slug field to be unique new_new_field = SlugField(unique=True) new_new_field.set_attributes_from_name("slug") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Tag, - new_field, - new_new_field, - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Tag, + new_field, + new_new_field, + strict = True, + ) # Ensure the field is unique again Tag.objects.create(title="foo", slug="foo") self.assertRaises(IntegrityError, Tag.objects.create, title="bar", slug="foo") - connection.rollback() + Tag.objects.all().delete() # Rename the field new_field = SlugField(unique=False) new_field.set_attributes_from_name("slug2") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Tag, - Tag._meta.get_field_by_name("slug")[0], - TagUniqueRename._meta.get_field_by_name("slug2")[0], - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Tag, + Tag._meta.get_field_by_name("slug")[0], + TagUniqueRename._meta.get_field_by_name("slug2")[0], + strict = True, + ) # Ensure the field is still unique TagUniqueRename.objects.create(title="foo", slug2="foo") self.assertRaises(IntegrityError, TagUniqueRename.objects.create, title="bar", slug2="foo") - connection.rollback() + Tag.objects.all().delete() def test_unique_together(self): """ Tests removing and adding unique_together constraints on a model. """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(UniqueTest) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(UniqueTest) # Ensure the fields are unique to begin with UniqueTest.objects.create(year=2012, slug="foo") UniqueTest.objects.create(year=2011, slug="foo") UniqueTest.objects.create(year=2011, slug="bar") self.assertRaises(IntegrityError, UniqueTest.objects.create, year=2012, slug="foo") - connection.rollback() + UniqueTest.objects.all().delete() # Alter the model to it's non-unique-together companion - editor = connection.schema_editor() - editor.start() - editor.alter_unique_together( - UniqueTest, - UniqueTest._meta.unique_together, - [], - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_unique_together( + UniqueTest, + UniqueTest._meta.unique_together, + [], + ) # Ensure the fields are no longer unique UniqueTest.objects.create(year=2012, slug="foo") UniqueTest.objects.create(year=2012, slug="foo") - connection.rollback() + UniqueTest.objects.all().delete() # Alter it back new_new_field = SlugField(unique=True) new_new_field.set_attributes_from_name("slug") - editor = connection.schema_editor() - editor.start() - editor.alter_unique_together( - UniqueTest, - [], - UniqueTest._meta.unique_together, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_unique_together( + UniqueTest, + [], + UniqueTest._meta.unique_together, + ) # Ensure the fields are unique again UniqueTest.objects.create(year=2012, slug="foo") self.assertRaises(IntegrityError, UniqueTest.objects.create, year=2012, slug="foo") - connection.rollback() + UniqueTest.objects.all().delete() def test_db_table(self): """ Tests renaming of the table """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) # Ensure the table is there to begin with columns = self.column_classes(Author) self.assertEqual(columns['name'][0], "CharField") # Alter the table - editor = connection.schema_editor() - editor.start() - editor.alter_db_table( - Author, - "schema_author", - "schema_otherauthor", - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_db_table( + Author, + "schema_author", + "schema_otherauthor", + ) # Ensure the table is there afterwards Author._meta.db_table = "schema_otherauthor" columns = self.column_classes(Author) self.assertEqual(columns['name'][0], "CharField") # Alter the table again - editor = connection.schema_editor() - editor.start() - editor.alter_db_table( - Author, - "schema_otherauthor", - "schema_author", - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_db_table( + Author, + "schema_otherauthor", + "schema_author", + ) # Ensure the table is still there Author._meta.db_table = "schema_author" columns = self.column_classes(Author) @@ -562,11 +492,9 @@ class SchemaTests(TransactionTestCase): Tests creation/altering of indexes """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Author) - editor.create_model(Book) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Author) + editor.create_model(Book) # Ensure the table is there and has the right index self.assertIn( "title", @@ -575,43 +503,37 @@ class SchemaTests(TransactionTestCase): # Alter to remove the index new_field = CharField(max_length=100, db_index=False) new_field.set_attributes_from_name("title") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Book, - Book._meta.get_field_by_name("title")[0], - new_field, - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Book, + Book._meta.get_field_by_name("title")[0], + new_field, + strict = True, + ) # Ensure the table is there and has no index self.assertNotIn( "title", connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table), ) # Alter to re-add the index - editor = connection.schema_editor() - editor.start() - editor.alter_field( - Book, - new_field, - Book._meta.get_field_by_name("title")[0], - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + Book, + new_field, + Book._meta.get_field_by_name("title")[0], + strict = True, + ) # Ensure the table is there and has the index again self.assertIn( "title", connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table), ) # Add a unique column, verify that creates an implicit index - editor = connection.schema_editor() - editor.start() - editor.create_field( - Book, - BookWithSlug._meta.get_field_by_name("slug")[0], - ) - editor.commit() + with connection.schema_editor() as editor: + editor.create_field( + Book, + BookWithSlug._meta.get_field_by_name("slug")[0], + ) self.assertIn( "slug", connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table), @@ -619,15 +541,13 @@ class SchemaTests(TransactionTestCase): # Remove the unique, check the index goes with it new_field2 = CharField(max_length=20, unique=False) new_field2.set_attributes_from_name("slug") - editor = connection.schema_editor() - editor.start() - editor.alter_field( - BookWithSlug, - BookWithSlug._meta.get_field_by_name("slug")[0], - new_field2, - strict = True, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.alter_field( + BookWithSlug, + BookWithSlug._meta.get_field_by_name("slug")[0], + new_field2, + strict = True, + ) self.assertNotIn( "slug", connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table), @@ -638,10 +558,8 @@ class SchemaTests(TransactionTestCase): Tests altering of the primary key """ # Create the table - editor = connection.schema_editor() - editor.start() - editor.create_model(Tag) - editor.commit() + with connection.schema_editor() as editor: + editor.create_model(Tag) # Ensure the table is there and has the right PK self.assertTrue( connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['id']['primary_key'], @@ -649,15 +567,13 @@ class SchemaTests(TransactionTestCase): # Alter to change the PK new_field = SlugField(primary_key=True) new_field.set_attributes_from_name("slug") - editor = connection.schema_editor() - editor.start() - editor.delete_field(Tag, Tag._meta.get_field_by_name("id")[0]) - editor.alter_field( - Tag, - Tag._meta.get_field_by_name("slug")[0], - new_field, - ) - editor.commit() + with connection.schema_editor() as editor: + editor.delete_field(Tag, Tag._meta.get_field_by_name("id")[0]) + editor.alter_field( + Tag, + Tag._meta.get_field_by_name("slug")[0], + new_field, + ) # Ensure the PK changed self.assertNotIn( 'id',