More conversion to a ContextManager schema_editor

This commit is contained in:
Andrew Godwin 2013-05-18 11:48:46 +02:00
parent ce5bd42259
commit 331546f6ee
3 changed files with 215 additions and 312 deletions

View File

@ -61,38 +61,19 @@ class BaseDatabaseSchemaEditor(object):
# State-managing methods
def start(self):
"""
Marks the start of a schema-altering run.
"""
self.deferred_sql = []
atomic(self.connection.alias).__enter__()
def commit(self):
"""
Finishes a schema-altering run.
"""
for sql in self.deferred_sql:
self.execute(sql)
atomic(self.connection.alias).__exit__(None, None, None)
def rollback(self):
"""
Tries to roll back a schema-altering run. Call instead of commit().
"""
if not self.connection.features.can_rollback_ddl:
raise RuntimeError("Cannot rollback schema changes on this backend")
atomic(self.connection.alias).__exit__(*sys.exc_info())
def __enter__(self):
self.start()
self.deferred_sql = []
atomic(self.connection.alias, self.connection.features.can_rollback_ddl).__enter__()
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.commit()
for sql in self.deferred_sql:
self.execute(sql)
atomic(self.connection.alias, self.connection.features.can_rollback_ddl).__exit__(None, None, None)
else:
self.rollback()
# Continue propagating exception
return None
# Core utility functions

View File

@ -30,6 +30,12 @@ class ProjectState(object):
model.render(self.app_cache)
return self.app_cache
@classmethod
def from_app_cache(cls, app_cache):
"Takes in an AppCache and returns a ProjectState matching it"
for model in app_cache.get_models():
print model
class ModelState(object):
"""

View File

@ -5,6 +5,7 @@ from django.utils.unittest import skipUnless
from django.db import connection, DatabaseError, IntegrityError
from django.db.models.fields import IntegerField, TextField, CharField, SlugField
from django.db.models.fields.related import ManyToManyField, ForeignKey
from django.db.transaction import atomic
from .models import Author, AuthorWithM2M, Book, BookWithSlug, BookWithM2M, Tag, TagUniqueRename, UniqueTest
@ -22,17 +23,9 @@ class SchemaTests(TransactionTestCase):
# Utility functions
def setUp(self):
# Make sure we're in manual transaction mode
connection.set_autocommit(False)
def tearDown(self):
# Delete any tables made for our models
connection.rollback()
self.delete_tables()
# Rollback anything that may have happened
connection.rollback()
connection.set_autocommit(True)
def delete_tables(self):
"Deletes all model tables for our models for a clean test environment"
@ -41,29 +34,27 @@ class SchemaTests(TransactionTestCase):
for model in self.models:
# Remove any M2M tables first
for field in model._meta.local_many_to_many:
with atomic():
try:
cursor.execute(connection.schema_editor().sql_delete_table % {
"table": connection.ops.quote_name(field.rel.through._meta.db_table),
})
except DatabaseError as e:
if any([s in str(e).lower() for s in self.no_table_strings]):
connection.rollback()
pass
else:
raise
else:
connection.commit()
# Then remove the main tables
with atomic():
try:
cursor.execute(connection.schema_editor().sql_delete_table % {
"table": connection.ops.quote_name(model._meta.db_table),
})
except DatabaseError as e:
if any([s in str(e).lower() for s in self.no_table_strings]):
connection.rollback()
pass
else:
raise
else:
connection.commit()
connection.enable_constraint_checking()
def column_classes(self, model):
@ -91,33 +82,27 @@ class SchemaTests(TransactionTestCase):
Tries creating a model's table, and then deleting it.
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.commit()
# Check that it's there
list(Author.objects.all())
# Clean up that table
editor.start()
with connection.schema_editor() as editor:
editor.delete_model(Author)
editor.commit()
# Check that it's gone
self.assertRaises(
DatabaseError,
lambda: list(Author.objects.all()),
)
connection.rollback()
@skipUnless(connection.features.supports_foreign_keys, "No FK support")
def test_fk(self):
"Tests that creating tables out of FK order, then repointing, works"
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Book)
editor.create_model(Author)
editor.create_model(Tag)
editor.commit()
# Check that initial tables are there
list(Author.objects.all())
list(Book.objects.all())
@ -128,19 +113,16 @@ class SchemaTests(TransactionTestCase):
title = "Much Ado About Foreign Keys",
pub_date = datetime.datetime.now(),
)
connection.commit()
# Repoint the FK constraint
new_field = ForeignKey(Tag)
new_field.set_attributes_from_name("author")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Book,
Book._meta.get_field_by_name("author")[0],
new_field,
strict=True,
)
editor.commit()
# Make sure the new FK constraint is present
constraints = connection.introspection.get_constraints(connection.cursor(), Book._meta.db_table)
for name, details in constraints.items():
@ -155,23 +137,19 @@ class SchemaTests(TransactionTestCase):
Tests adding fields to models
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.commit()
# Ensure there's no age field
columns = self.column_classes(Author)
self.assertNotIn("age", columns)
# Alter the name field to a TextField
new_field = IntegerField(null=True)
new_field.set_attributes_from_name("age")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_field(
Author,
new_field,
)
editor.commit()
# Ensure the field is right afterwards
columns = self.column_classes(Author)
self.assertEqual(columns['age'][0], "IntegerField")
@ -182,10 +160,8 @@ class SchemaTests(TransactionTestCase):
Tests simple altering of fields
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.commit()
# Ensure the field is right to begin with
columns = self.column_classes(Author)
self.assertEqual(columns['name'][0], "CharField")
@ -193,15 +169,13 @@ class SchemaTests(TransactionTestCase):
# Alter the name field to a TextField
new_field = TextField(null=True)
new_field.set_attributes_from_name("name")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Author,
Author._meta.get_field_by_name("name")[0],
new_field,
strict=True,
)
editor.commit()
# Ensure the field is right afterwards
columns = self.column_classes(Author)
self.assertEqual(columns['name'][0], "TextField")
@ -209,15 +183,13 @@ class SchemaTests(TransactionTestCase):
# Change nullability again
new_field2 = TextField(null=False)
new_field2.set_attributes_from_name("name")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Author,
new_field,
new_field2,
strict=True,
)
editor.commit()
# Ensure the field is right afterwards
columns = self.column_classes(Author)
self.assertEqual(columns['name'][0], "TextField")
@ -228,10 +200,8 @@ class SchemaTests(TransactionTestCase):
Tests simple altering of fields
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.commit()
# Ensure the field is right to begin with
columns = self.column_classes(Author)
self.assertEqual(columns['name'][0], "CharField")
@ -239,15 +209,13 @@ class SchemaTests(TransactionTestCase):
# Alter the name field's name
new_field = CharField(max_length=254)
new_field.set_attributes_from_name("display_name")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Author,
Author._meta.get_field_by_name("name")[0],
new_field,
strict = True,
)
editor.commit()
# Ensure the field is right afterwards
columns = self.column_classes(Author)
self.assertEqual(columns['display_name'][0], "CharField")
@ -258,12 +226,10 @@ class SchemaTests(TransactionTestCase):
Tests M2M fields on models during creation
"""
# Create the tables
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.create_model(Tag)
editor.create_model(BookWithM2M)
editor.commit()
# Ensure there is now an m2m table there
columns = self.column_classes(BookWithM2M._meta.get_field_by_name("tags")[0].rel.through)
self.assertEqual(columns['tag_id'][0], "IntegerField")
@ -273,11 +239,9 @@ class SchemaTests(TransactionTestCase):
Tests adding/removing M2M fields on models
"""
# Create the tables
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(AuthorWithM2M)
editor.create_model(Tag)
editor.commit()
# Create an M2M field
new_field = ManyToManyField("schema.Tag", related_name="authors")
new_field.contribute_to_class(AuthorWithM2M, "tags")
@ -286,24 +250,20 @@ class SchemaTests(TransactionTestCase):
self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through)
connection.rollback()
# Add the field
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_field(
Author,
new_field,
)
editor.commit()
# Ensure there is now an m2m table there
columns = self.column_classes(new_field.rel.through)
self.assertEqual(columns['tag_id'][0], "IntegerField")
# Remove the M2M table again
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.delete_field(
Author,
new_field,
)
editor.commit()
# Ensure there's no m2m table there
self.assertRaises(DatabaseError, self.column_classes, new_field.rel.through)
connection.rollback()
@ -317,13 +277,11 @@ class SchemaTests(TransactionTestCase):
Tests repointing M2M fields
"""
# Create the tables
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.create_model(BookWithM2M)
editor.create_model(Tag)
editor.create_model(UniqueTest)
editor.commit()
# Ensure the M2M exists and points to Tag
constraints = connection.introspection.get_constraints(connection.cursor(), BookWithM2M._meta.get_field_by_name("tags")[0].rel.through._meta.db_table)
if connection.features.supports_foreign_keys:
@ -337,14 +295,12 @@ class SchemaTests(TransactionTestCase):
new_field = ManyToManyField(UniqueTest)
new_field.contribute_to_class(BookWithM2M, "uniques")
try:
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Author,
BookWithM2M._meta.get_field_by_name("tags")[0],
new_field,
)
editor.commit()
# Ensure old M2M is gone
self.assertRaises(DatabaseError, self.column_classes, BookWithM2M._meta.get_field_by_name("tags")[0].rel.through)
connection.rollback()
@ -368,10 +324,8 @@ class SchemaTests(TransactionTestCase):
Tests creating/deleting CHECK constraints
"""
# Create the tables
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.commit()
# Ensure the constraint exists
constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table)
for name, details in constraints.items():
@ -382,29 +336,25 @@ class SchemaTests(TransactionTestCase):
# Alter the column to remove it
new_field = IntegerField(null=True, blank=True)
new_field.set_attributes_from_name("height")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Author,
Author._meta.get_field_by_name("height")[0],
new_field,
strict = True,
)
editor.commit()
constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table)
for name, details in constraints.items():
if details['columns'] == set(["height"]) and details['check']:
self.fail("Check constraint for height found")
# Alter the column to re-add it
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Author,
new_field,
Author._meta.get_field_by_name("height")[0],
strict = True,
)
editor.commit()
constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table)
for name, details in constraints.items():
if details['columns'] == set(["height"]) and details['check']:
@ -417,141 +367,121 @@ class SchemaTests(TransactionTestCase):
Tests removing and adding unique constraints to a single column.
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Tag)
editor.commit()
# Ensure the field is unique to begin with
Tag.objects.create(title="foo", slug="foo")
self.assertRaises(IntegrityError, Tag.objects.create, title="bar", slug="foo")
connection.rollback()
Tag.objects.all().delete()
# Alter the slug field to be non-unique
new_field = SlugField(unique=False)
new_field.set_attributes_from_name("slug")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Tag,
Tag._meta.get_field_by_name("slug")[0],
new_field,
strict = True,
)
editor.commit()
# Ensure the field is no longer unique
Tag.objects.create(title="foo", slug="foo")
Tag.objects.create(title="bar", slug="foo")
connection.rollback()
Tag.objects.all().delete()
# Alter the slug field to be unique
new_new_field = SlugField(unique=True)
new_new_field.set_attributes_from_name("slug")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Tag,
new_field,
new_new_field,
strict = True,
)
editor.commit()
# Ensure the field is unique again
Tag.objects.create(title="foo", slug="foo")
self.assertRaises(IntegrityError, Tag.objects.create, title="bar", slug="foo")
connection.rollback()
Tag.objects.all().delete()
# Rename the field
new_field = SlugField(unique=False)
new_field.set_attributes_from_name("slug2")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Tag,
Tag._meta.get_field_by_name("slug")[0],
TagUniqueRename._meta.get_field_by_name("slug2")[0],
strict = True,
)
editor.commit()
# Ensure the field is still unique
TagUniqueRename.objects.create(title="foo", slug2="foo")
self.assertRaises(IntegrityError, TagUniqueRename.objects.create, title="bar", slug2="foo")
connection.rollback()
Tag.objects.all().delete()
def test_unique_together(self):
"""
Tests removing and adding unique_together constraints on a model.
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(UniqueTest)
editor.commit()
# Ensure the fields are unique to begin with
UniqueTest.objects.create(year=2012, slug="foo")
UniqueTest.objects.create(year=2011, slug="foo")
UniqueTest.objects.create(year=2011, slug="bar")
self.assertRaises(IntegrityError, UniqueTest.objects.create, year=2012, slug="foo")
connection.rollback()
UniqueTest.objects.all().delete()
# Alter the model to it's non-unique-together companion
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_unique_together(
UniqueTest,
UniqueTest._meta.unique_together,
[],
)
editor.commit()
# Ensure the fields are no longer unique
UniqueTest.objects.create(year=2012, slug="foo")
UniqueTest.objects.create(year=2012, slug="foo")
connection.rollback()
UniqueTest.objects.all().delete()
# Alter it back
new_new_field = SlugField(unique=True)
new_new_field.set_attributes_from_name("slug")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_unique_together(
UniqueTest,
[],
UniqueTest._meta.unique_together,
)
editor.commit()
# Ensure the fields are unique again
UniqueTest.objects.create(year=2012, slug="foo")
self.assertRaises(IntegrityError, UniqueTest.objects.create, year=2012, slug="foo")
connection.rollback()
UniqueTest.objects.all().delete()
def test_db_table(self):
"""
Tests renaming of the table
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.commit()
# Ensure the table is there to begin with
columns = self.column_classes(Author)
self.assertEqual(columns['name'][0], "CharField")
# Alter the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_db_table(
Author,
"schema_author",
"schema_otherauthor",
)
editor.commit()
# Ensure the table is there afterwards
Author._meta.db_table = "schema_otherauthor"
columns = self.column_classes(Author)
self.assertEqual(columns['name'][0], "CharField")
# Alter the table again
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_db_table(
Author,
"schema_otherauthor",
"schema_author",
)
editor.commit()
# Ensure the table is still there
Author._meta.db_table = "schema_author"
columns = self.column_classes(Author)
@ -562,11 +492,9 @@ class SchemaTests(TransactionTestCase):
Tests creation/altering of indexes
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Author)
editor.create_model(Book)
editor.commit()
# Ensure the table is there and has the right index
self.assertIn(
"title",
@ -575,43 +503,37 @@ class SchemaTests(TransactionTestCase):
# Alter to remove the index
new_field = CharField(max_length=100, db_index=False)
new_field.set_attributes_from_name("title")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Book,
Book._meta.get_field_by_name("title")[0],
new_field,
strict = True,
)
editor.commit()
# Ensure the table is there and has no index
self.assertNotIn(
"title",
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
)
# Alter to re-add the index
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
Book,
new_field,
Book._meta.get_field_by_name("title")[0],
strict = True,
)
editor.commit()
# Ensure the table is there and has the index again
self.assertIn(
"title",
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
)
# Add a unique column, verify that creates an implicit index
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_field(
Book,
BookWithSlug._meta.get_field_by_name("slug")[0],
)
editor.commit()
self.assertIn(
"slug",
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
@ -619,15 +541,13 @@ class SchemaTests(TransactionTestCase):
# Remove the unique, check the index goes with it
new_field2 = CharField(max_length=20, unique=False)
new_field2.set_attributes_from_name("slug")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.alter_field(
BookWithSlug,
BookWithSlug._meta.get_field_by_name("slug")[0],
new_field2,
strict = True,
)
editor.commit()
self.assertNotIn(
"slug",
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
@ -638,10 +558,8 @@ class SchemaTests(TransactionTestCase):
Tests altering of the primary key
"""
# Create the table
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.create_model(Tag)
editor.commit()
# Ensure the table is there and has the right PK
self.assertTrue(
connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['id']['primary_key'],
@ -649,15 +567,13 @@ class SchemaTests(TransactionTestCase):
# Alter to change the PK
new_field = SlugField(primary_key=True)
new_field.set_attributes_from_name("slug")
editor = connection.schema_editor()
editor.start()
with connection.schema_editor() as editor:
editor.delete_field(Tag, Tag._meta.get_field_by_name("id")[0])
editor.alter_field(
Tag,
Tag._meta.get_field_by_name("slug")[0],
new_field,
)
editor.commit()
# Ensure the PK changed
self.assertNotIn(
'id',