361 lines
13 KiB
Python
361 lines
13 KiB
Python
from __future__ import absolute_import
|
|
|
|
import datetime
|
|
|
|
from django.conf import settings
|
|
from django.db import transaction, DEFAULT_DB_ALIAS, models
|
|
from django.db.utils import ConnectionHandler
|
|
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
|
|
|
|
from .models import (Book, Award, AwardNote, Person, Child, Toy, PlayedWith,
|
|
PlayedWithNote, Email, Researcher, Food, Eaten, Policy, Version, Location,
|
|
Item, Image, File, Photo, FooFile, FooImage, FooPhoto, FooFileProxy, Login,
|
|
OrgUnit, OrderedPerson, House)
|
|
|
|
|
|
# Can't run this test under SQLite, because you can't
|
|
# get two connections to an in-memory database.
|
|
class DeleteLockingTest(TransactionTestCase):
|
|
def setUp(self):
|
|
# Create a second connection to the default database
|
|
new_connections = ConnectionHandler(settings.DATABASES)
|
|
self.conn2 = new_connections[DEFAULT_DB_ALIAS]
|
|
# Put both DB connections into managed transaction mode
|
|
transaction.enter_transaction_management()
|
|
self.conn2.enter_transaction_management()
|
|
|
|
def tearDown(self):
|
|
# Close down the second connection.
|
|
transaction.leave_transaction_management()
|
|
self.conn2.abort()
|
|
self.conn2.close()
|
|
|
|
@skipUnlessDBFeature('test_db_allows_multiple_connections')
|
|
def test_concurrent_delete(self):
|
|
"Deletes on concurrent transactions don't collide and lock the database. Regression for #9479"
|
|
|
|
# Create some dummy data
|
|
b1 = Book(id=1, pagecount=100)
|
|
b2 = Book(id=2, pagecount=200)
|
|
b3 = Book(id=3, pagecount=300)
|
|
b1.save()
|
|
b2.save()
|
|
b3.save()
|
|
|
|
transaction.commit()
|
|
|
|
self.assertEqual(3, Book.objects.count())
|
|
|
|
# Delete something using connection 2.
|
|
cursor2 = self.conn2.cursor()
|
|
cursor2.execute('DELETE from delete_regress_book WHERE id=1')
|
|
self.conn2._commit()
|
|
|
|
# Now perform a queryset delete that covers the object
|
|
# deleted in connection 2. This causes an infinite loop
|
|
# under MySQL InnoDB unless we keep track of already
|
|
# deleted objects.
|
|
Book.objects.filter(pagecount__lt=250).delete()
|
|
transaction.commit()
|
|
self.assertEqual(1, Book.objects.count())
|
|
transaction.commit()
|
|
|
|
|
|
class DeleteCascadeTests(TestCase):
|
|
def test_generic_relation_cascade(self):
|
|
"""
|
|
Django cascades deletes through generic-related objects to their
|
|
reverse relations.
|
|
|
|
"""
|
|
person = Person.objects.create(name='Nelson Mandela')
|
|
award = Award.objects.create(name='Nobel', content_object=person)
|
|
note = AwardNote.objects.create(note='a peace prize',
|
|
award=award)
|
|
self.assertEqual(AwardNote.objects.count(), 1)
|
|
person.delete()
|
|
self.assertEqual(Award.objects.count(), 0)
|
|
# first two asserts are just sanity checks, this is the kicker:
|
|
self.assertEqual(AwardNote.objects.count(), 0)
|
|
|
|
def test_fk_to_m2m_through(self):
|
|
"""
|
|
If an M2M relationship has an explicitly-specified through model, and
|
|
some other model has an FK to that through model, deletion is cascaded
|
|
from one of the participants in the M2M, to the through model, to its
|
|
related model.
|
|
|
|
"""
|
|
juan = Child.objects.create(name='Juan')
|
|
paints = Toy.objects.create(name='Paints')
|
|
played = PlayedWith.objects.create(child=juan, toy=paints,
|
|
date=datetime.date.today())
|
|
note = PlayedWithNote.objects.create(played=played,
|
|
note='the next Jackson Pollock')
|
|
self.assertEqual(PlayedWithNote.objects.count(), 1)
|
|
paints.delete()
|
|
self.assertEqual(PlayedWith.objects.count(), 0)
|
|
# first two asserts just sanity checks, this is the kicker:
|
|
self.assertEqual(PlayedWithNote.objects.count(), 0)
|
|
|
|
def test_15776(self):
|
|
policy = Policy.objects.create(pk=1, policy_number="1234")
|
|
version = Version.objects.create(policy=policy)
|
|
location = Location.objects.create(version=version)
|
|
item = Item.objects.create(version=version, location=location)
|
|
policy.delete()
|
|
|
|
|
|
class DeleteCascadeTransactionTests(TransactionTestCase):
|
|
def test_inheritance(self):
|
|
"""
|
|
Auto-created many-to-many through tables referencing a parent model are
|
|
correctly found by the delete cascade when a child of that parent is
|
|
deleted.
|
|
|
|
Refs #14896.
|
|
"""
|
|
r = Researcher.objects.create()
|
|
email = Email.objects.create(
|
|
label="office-email", email_address="carl@science.edu"
|
|
)
|
|
r.contacts.add(email)
|
|
|
|
email.delete()
|
|
|
|
def test_to_field(self):
|
|
"""
|
|
Cascade deletion works with ForeignKey.to_field set to non-PK.
|
|
|
|
"""
|
|
apple = Food.objects.create(name="apple")
|
|
eaten = Eaten.objects.create(food=apple, meal="lunch")
|
|
|
|
apple.delete()
|
|
self.assertFalse(Food.objects.exists())
|
|
self.assertFalse(Eaten.objects.exists())
|
|
|
|
|
|
class LargeDeleteTests(TestCase):
|
|
def test_large_deletes(self):
|
|
"Regression for #13309 -- if the number of objects > chunk size, deletion still occurs"
|
|
for x in range(300):
|
|
track = Book.objects.create(pagecount=x+100)
|
|
# attach a signal to make sure we will not fast-delete
|
|
def noop(*args, **kwargs):
|
|
pass
|
|
models.signals.post_delete.connect(noop, sender=Book)
|
|
Book.objects.all().delete()
|
|
models.signals.post_delete.disconnect(noop, sender=Book)
|
|
self.assertEqual(Book.objects.count(), 0)
|
|
|
|
|
|
class ProxyDeleteTest(TestCase):
|
|
"""
|
|
Tests on_delete behavior for proxy models.
|
|
|
|
See #16128.
|
|
|
|
"""
|
|
def create_image(self):
|
|
"""Return an Image referenced by both a FooImage and a FooFile."""
|
|
# Create an Image
|
|
test_image = Image()
|
|
test_image.save()
|
|
foo_image = FooImage(my_image=test_image)
|
|
foo_image.save()
|
|
|
|
# Get the Image instance as a File
|
|
test_file = File.objects.get(pk=test_image.pk)
|
|
foo_file = FooFile(my_file=test_file)
|
|
foo_file.save()
|
|
|
|
return test_image
|
|
|
|
|
|
def test_delete_proxy(self):
|
|
"""
|
|
Deleting the *proxy* instance bubbles through to its non-proxy and
|
|
*all* referring objects are deleted.
|
|
|
|
"""
|
|
self.create_image()
|
|
|
|
Image.objects.all().delete()
|
|
|
|
# An Image deletion == File deletion
|
|
self.assertEqual(len(Image.objects.all()), 0)
|
|
self.assertEqual(len(File.objects.all()), 0)
|
|
|
|
# The Image deletion cascaded and *all* references to it are deleted.
|
|
self.assertEqual(len(FooImage.objects.all()), 0)
|
|
self.assertEqual(len(FooFile.objects.all()), 0)
|
|
|
|
|
|
def test_delete_proxy_of_proxy(self):
|
|
"""
|
|
Deleting a proxy-of-proxy instance should bubble through to its proxy
|
|
and non-proxy parents, deleting *all* referring objects.
|
|
|
|
"""
|
|
test_image = self.create_image()
|
|
|
|
# Get the Image as a Photo
|
|
test_photo = Photo.objects.get(pk=test_image.pk)
|
|
foo_photo = FooPhoto(my_photo=test_photo)
|
|
foo_photo.save()
|
|
|
|
Photo.objects.all().delete()
|
|
|
|
# A Photo deletion == Image deletion == File deletion
|
|
self.assertEqual(len(Photo.objects.all()), 0)
|
|
self.assertEqual(len(Image.objects.all()), 0)
|
|
self.assertEqual(len(File.objects.all()), 0)
|
|
|
|
# The Photo deletion should have cascaded and deleted *all*
|
|
# references to it.
|
|
self.assertEqual(len(FooPhoto.objects.all()), 0)
|
|
self.assertEqual(len(FooFile.objects.all()), 0)
|
|
self.assertEqual(len(FooImage.objects.all()), 0)
|
|
|
|
|
|
def test_delete_concrete_parent(self):
|
|
"""
|
|
Deleting an instance of a concrete model should also delete objects
|
|
referencing its proxy subclass.
|
|
|
|
"""
|
|
self.create_image()
|
|
|
|
File.objects.all().delete()
|
|
|
|
# A File deletion == Image deletion
|
|
self.assertEqual(len(File.objects.all()), 0)
|
|
self.assertEqual(len(Image.objects.all()), 0)
|
|
|
|
# The File deletion should have cascaded and deleted *all* references
|
|
# to it.
|
|
self.assertEqual(len(FooFile.objects.all()), 0)
|
|
self.assertEqual(len(FooImage.objects.all()), 0)
|
|
|
|
|
|
def test_delete_proxy_pair(self):
|
|
"""
|
|
If a pair of proxy models are linked by an FK from one concrete parent
|
|
to the other, deleting one proxy model cascade-deletes the other, and
|
|
the deletion happens in the right order (not triggering an
|
|
IntegrityError on databases unable to defer integrity checks).
|
|
|
|
Refs #17918.
|
|
|
|
"""
|
|
# Create an Image (proxy of File) and FooFileProxy (proxy of FooFile,
|
|
# which has an FK to File)
|
|
image = Image.objects.create()
|
|
as_file = File.objects.get(pk=image.pk)
|
|
FooFileProxy.objects.create(my_file=as_file)
|
|
|
|
Image.objects.all().delete()
|
|
|
|
self.assertEqual(len(FooFileProxy.objects.all()), 0)
|
|
|
|
def test_19187_values(self):
|
|
with self.assertRaises(TypeError):
|
|
Image.objects.values().delete()
|
|
with self.assertRaises(TypeError):
|
|
Image.objects.values_list().delete()
|
|
|
|
class Ticket19102Tests(TestCase):
|
|
"""
|
|
Test different queries which alter the SELECT clause of the query. We
|
|
also must be using a subquery for the deletion (that is, the original
|
|
query has a join in it). The deletion should be done as "fast-path"
|
|
deletion (that is, just one query for the .delete() call).
|
|
|
|
Note that .values() is not tested here on purpose. .values().delete()
|
|
doesn't work for non fast-path deletes at all.
|
|
"""
|
|
def setUp(self):
|
|
self.o1 = OrgUnit.objects.create(name='o1')
|
|
self.o2 = OrgUnit.objects.create(name='o2')
|
|
self.l1 = Login.objects.create(description='l1', orgunit=self.o1)
|
|
self.l2 = Login.objects.create(description='l2', orgunit=self.o2)
|
|
|
|
@skipUnlessDBFeature("update_can_self_select")
|
|
def test_ticket_19102_annotate(self):
|
|
with self.assertNumQueries(1):
|
|
Login.objects.order_by('description').filter(
|
|
orgunit__name__isnull=False
|
|
).annotate(
|
|
n=models.Count('description')
|
|
).filter(
|
|
n=1, pk=self.l1.pk
|
|
).delete()
|
|
self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
|
|
self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
|
|
|
|
@skipUnlessDBFeature("update_can_self_select")
|
|
def test_ticket_19102_extra(self):
|
|
with self.assertNumQueries(1):
|
|
Login.objects.order_by('description').filter(
|
|
orgunit__name__isnull=False
|
|
).extra(
|
|
select={'extraf':'1'}
|
|
).filter(
|
|
pk=self.l1.pk
|
|
).delete()
|
|
self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
|
|
self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
|
|
|
|
@skipUnlessDBFeature("update_can_self_select")
|
|
@skipUnlessDBFeature('can_distinct_on_fields')
|
|
def test_ticket_19102_distinct_on(self):
|
|
# Both Login objs should have same description so that only the one
|
|
# having smaller PK will be deleted.
|
|
Login.objects.update(description='description')
|
|
with self.assertNumQueries(1):
|
|
Login.objects.distinct('description').order_by('pk').filter(
|
|
orgunit__name__isnull=False
|
|
).delete()
|
|
# Assumed that l1 which is created first has smaller PK.
|
|
self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
|
|
self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
|
|
|
|
@skipUnlessDBFeature("update_can_self_select")
|
|
def test_ticket_19102_select_related(self):
|
|
with self.assertNumQueries(1):
|
|
Login.objects.filter(
|
|
pk=self.l1.pk
|
|
).filter(
|
|
orgunit__name__isnull=False
|
|
).order_by(
|
|
'description'
|
|
).select_related('orgunit').delete()
|
|
self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
|
|
self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
|
|
|
|
@skipUnlessDBFeature("update_can_self_select")
|
|
def test_ticket_19102_defer(self):
|
|
with self.assertNumQueries(1):
|
|
Login.objects.filter(
|
|
pk=self.l1.pk
|
|
).filter(
|
|
orgunit__name__isnull=False
|
|
).order_by(
|
|
'description'
|
|
).only('id').delete()
|
|
self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
|
|
self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
|
|
|
|
|
|
class OrderedDeleteTests(TestCase):
|
|
def test_meta_ordered_delete(self):
|
|
# When a subquery is performed by deletion code, the subquery must be
|
|
# cleared of all ordering. There was a but that caused _meta ordering
|
|
# to be used. Refs #19720.
|
|
h = House.objects.create(address='Foo')
|
|
OrderedPerson.objects.create(name='Jack', lives_in=h)
|
|
OrderedPerson.objects.create(name='Bob', lives_in=h)
|
|
OrderedPerson.objects.filter(lives_in__address='Foo').delete()
|
|
self.assertEqual(OrderedPerson.objects.count(), 0)
|