django/tests/delete/tests.py

803 lines
29 KiB
Python

from math import ceil
from django.db import connection, models
from django.db.models import ProtectedError, Q, RestrictedError
from django.db.models.deletion import Collector
from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature
from .models import (
B1,
B2,
B3,
MR,
A,
Avatar,
B,
Base,
Child,
DeleteBottom,
DeleteTop,
GenericB1,
GenericB2,
GenericDeleteBottom,
HiddenUser,
HiddenUserProfile,
M,
M2MFrom,
M2MTo,
MRNull,
Origin,
P,
Parent,
R,
RChild,
RChildChild,
Referrer,
S,
T,
User,
create_a,
get_default_r,
)
class OnDeleteTests(TestCase):
def setUp(self):
self.DEFAULT = get_default_r()
def test_auto(self):
a = create_a("auto")
a.auto.delete()
self.assertFalse(A.objects.filter(name="auto").exists())
def test_non_callable(self):
msg = "on_delete must be callable."
with self.assertRaisesMessage(TypeError, msg):
models.ForeignKey("self", on_delete=None)
with self.assertRaisesMessage(TypeError, msg):
models.OneToOneField("self", on_delete=None)
def test_auto_nullable(self):
a = create_a("auto_nullable")
a.auto_nullable.delete()
self.assertFalse(A.objects.filter(name="auto_nullable").exists())
def test_setvalue(self):
a = create_a("setvalue")
a.setvalue.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(self.DEFAULT, a.setvalue.pk)
def test_setnull(self):
a = create_a("setnull")
a.setnull.delete()
a = A.objects.get(pk=a.pk)
self.assertIsNone(a.setnull)
def test_setdefault(self):
a = create_a("setdefault")
a.setdefault.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(self.DEFAULT, a.setdefault.pk)
def test_setdefault_none(self):
a = create_a("setdefault_none")
a.setdefault_none.delete()
a = A.objects.get(pk=a.pk)
self.assertIsNone(a.setdefault_none)
def test_cascade(self):
a = create_a("cascade")
a.cascade.delete()
self.assertFalse(A.objects.filter(name="cascade").exists())
def test_cascade_nullable(self):
a = create_a("cascade_nullable")
a.cascade_nullable.delete()
self.assertFalse(A.objects.filter(name="cascade_nullable").exists())
def test_protect(self):
a = create_a("protect")
msg = (
"Cannot delete some instances of model 'R' because they are "
"referenced through protected foreign keys: 'A.protect'."
)
with self.assertRaisesMessage(ProtectedError, msg) as cm:
a.protect.delete()
self.assertEqual(cm.exception.protected_objects, {a})
def test_protect_multiple(self):
a = create_a("protect")
b = B.objects.create(protect=a.protect)
msg = (
"Cannot delete some instances of model 'R' because they are "
"referenced through protected foreign keys: 'A.protect', "
"'B.protect'."
)
with self.assertRaisesMessage(ProtectedError, msg) as cm:
a.protect.delete()
self.assertEqual(cm.exception.protected_objects, {a, b})
def test_protect_path(self):
a = create_a("protect")
a.protect.p = P.objects.create()
a.protect.save()
msg = (
"Cannot delete some instances of model 'P' because they are "
"referenced through protected foreign keys: 'R.p'."
)
with self.assertRaisesMessage(ProtectedError, msg) as cm:
a.protect.p.delete()
self.assertEqual(cm.exception.protected_objects, {a})
def test_do_nothing(self):
# Testing DO_NOTHING is a bit harder: It would raise IntegrityError for
# a normal model, so we connect to pre_delete and set the fk to a known
# value.
replacement_r = R.objects.create()
def check_do_nothing(sender, **kwargs):
obj = kwargs["instance"]
obj.donothing_set.update(donothing=replacement_r)
models.signals.pre_delete.connect(check_do_nothing)
a = create_a("do_nothing")
a.donothing.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(replacement_r, a.donothing)
models.signals.pre_delete.disconnect(check_do_nothing)
def test_do_nothing_qscount(self):
"""
A models.DO_NOTHING relation doesn't trigger a query.
"""
b = Base.objects.create()
with self.assertNumQueries(1):
# RelToBase should not be queried.
b.delete()
self.assertEqual(Base.objects.count(), 0)
def test_inheritance_cascade_up(self):
child = RChild.objects.create()
child.delete()
self.assertFalse(R.objects.filter(pk=child.pk).exists())
def test_inheritance_cascade_down(self):
child = RChild.objects.create()
parent = child.r_ptr
parent.delete()
self.assertFalse(RChild.objects.filter(pk=child.pk).exists())
def test_cascade_from_child(self):
a = create_a("child")
a.child.delete()
self.assertFalse(A.objects.filter(name="child").exists())
self.assertFalse(R.objects.filter(pk=a.child_id).exists())
def test_cascade_from_parent(self):
a = create_a("child")
R.objects.get(pk=a.child_id).delete()
self.assertFalse(A.objects.filter(name="child").exists())
self.assertFalse(RChild.objects.filter(pk=a.child_id).exists())
def test_setnull_from_child(self):
a = create_a("child_setnull")
a.child_setnull.delete()
self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists())
a = A.objects.get(pk=a.pk)
self.assertIsNone(a.child_setnull)
def test_setnull_from_parent(self):
a = create_a("child_setnull")
R.objects.get(pk=a.child_setnull_id).delete()
self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists())
a = A.objects.get(pk=a.pk)
self.assertIsNone(a.child_setnull)
def test_o2o_setnull(self):
a = create_a("o2o_setnull")
a.o2o_setnull.delete()
a = A.objects.get(pk=a.pk)
self.assertIsNone(a.o2o_setnull)
def test_restrict(self):
a = create_a("restrict")
msg = (
"Cannot delete some instances of model 'R' because they are "
"referenced through restricted foreign keys: 'A.restrict'."
)
with self.assertRaisesMessage(RestrictedError, msg) as cm:
a.restrict.delete()
self.assertEqual(cm.exception.restricted_objects, {a})
def test_restrict_multiple(self):
a = create_a("restrict")
b3 = B3.objects.create(restrict=a.restrict)
msg = (
"Cannot delete some instances of model 'R' because they are "
"referenced through restricted foreign keys: 'A.restrict', "
"'B3.restrict'."
)
with self.assertRaisesMessage(RestrictedError, msg) as cm:
a.restrict.delete()
self.assertEqual(cm.exception.restricted_objects, {a, b3})
def test_restrict_path_cascade_indirect(self):
a = create_a("restrict")
a.restrict.p = P.objects.create()
a.restrict.save()
msg = (
"Cannot delete some instances of model 'P' because they are "
"referenced through restricted foreign keys: 'A.restrict'."
)
with self.assertRaisesMessage(RestrictedError, msg) as cm:
a.restrict.p.delete()
self.assertEqual(cm.exception.restricted_objects, {a})
# Object referenced also with CASCADE relationship can be deleted.
a.cascade.p = a.restrict.p
a.cascade.save()
a.restrict.p.delete()
self.assertFalse(A.objects.filter(name="restrict").exists())
self.assertFalse(R.objects.filter(pk=a.restrict_id).exists())
def test_restrict_path_cascade_direct(self):
a = create_a("restrict")
a.restrict.p = P.objects.create()
a.restrict.save()
a.cascade_p = a.restrict.p
a.save()
a.restrict.p.delete()
self.assertFalse(A.objects.filter(name="restrict").exists())
self.assertFalse(R.objects.filter(pk=a.restrict_id).exists())
def test_restrict_path_cascade_indirect_diamond(self):
delete_top = DeleteTop.objects.create()
b1 = B1.objects.create(delete_top=delete_top)
b2 = B2.objects.create(delete_top=delete_top)
delete_bottom = DeleteBottom.objects.create(b1=b1, b2=b2)
msg = (
"Cannot delete some instances of model 'B1' because they are "
"referenced through restricted foreign keys: 'DeleteBottom.b1'."
)
with self.assertRaisesMessage(RestrictedError, msg) as cm:
b1.delete()
self.assertEqual(cm.exception.restricted_objects, {delete_bottom})
self.assertTrue(DeleteTop.objects.exists())
self.assertTrue(B1.objects.exists())
self.assertTrue(B2.objects.exists())
self.assertTrue(DeleteBottom.objects.exists())
# Object referenced also with CASCADE relationship can be deleted.
delete_top.delete()
self.assertFalse(DeleteTop.objects.exists())
self.assertFalse(B1.objects.exists())
self.assertFalse(B2.objects.exists())
self.assertFalse(DeleteBottom.objects.exists())
def test_restrict_gfk_no_fast_delete(self):
delete_top = DeleteTop.objects.create()
generic_b1 = GenericB1.objects.create(generic_delete_top=delete_top)
generic_b2 = GenericB2.objects.create(generic_delete_top=delete_top)
generic_delete_bottom = GenericDeleteBottom.objects.create(
generic_b1=generic_b1,
generic_b2=generic_b2,
)
msg = (
"Cannot delete some instances of model 'GenericB1' because they "
"are referenced through restricted foreign keys: "
"'GenericDeleteBottom.generic_b1'."
)
with self.assertRaisesMessage(RestrictedError, msg) as cm:
generic_b1.delete()
self.assertEqual(cm.exception.restricted_objects, {generic_delete_bottom})
self.assertTrue(DeleteTop.objects.exists())
self.assertTrue(GenericB1.objects.exists())
self.assertTrue(GenericB2.objects.exists())
self.assertTrue(GenericDeleteBottom.objects.exists())
# Object referenced also with CASCADE relationship can be deleted.
delete_top.delete()
self.assertFalse(DeleteTop.objects.exists())
self.assertFalse(GenericB1.objects.exists())
self.assertFalse(GenericB2.objects.exists())
self.assertFalse(GenericDeleteBottom.objects.exists())
class DeletionTests(TestCase):
def test_sliced_queryset(self):
msg = "Cannot use 'limit' or 'offset' with delete()."
with self.assertRaisesMessage(TypeError, msg):
M.objects.all()[0:5].delete()
def test_pk_none(self):
m = M()
msg = "M object can't be deleted because its id attribute is set to None."
with self.assertRaisesMessage(ValueError, msg):
m.delete()
def test_m2m(self):
m = M.objects.create()
r = R.objects.create()
MR.objects.create(m=m, r=r)
r.delete()
self.assertFalse(MR.objects.exists())
r = R.objects.create()
MR.objects.create(m=m, r=r)
m.delete()
self.assertFalse(MR.objects.exists())
m = M.objects.create()
r = R.objects.create()
m.m2m.add(r)
r.delete()
through = M._meta.get_field("m2m").remote_field.through
self.assertFalse(through.objects.exists())
r = R.objects.create()
m.m2m.add(r)
m.delete()
self.assertFalse(through.objects.exists())
m = M.objects.create()
r = R.objects.create()
MRNull.objects.create(m=m, r=r)
r.delete()
self.assertFalse(not MRNull.objects.exists())
self.assertFalse(m.m2m_through_null.exists())
def test_bulk(self):
s = S.objects.create(r=R.objects.create())
for i in range(2 * GET_ITERATOR_CHUNK_SIZE):
T.objects.create(s=s)
# 1 (select related `T` instances)
# + 1 (select related `U` instances)
# + 2 (delete `T` instances in batches)
# + 1 (delete `s`)
self.assertNumQueries(5, s.delete)
self.assertFalse(S.objects.exists())
def test_instance_update(self):
deleted = []
related_setnull_sets = []
def pre_delete(sender, **kwargs):
obj = kwargs["instance"]
deleted.append(obj)
if isinstance(obj, R):
related_setnull_sets.append([a.pk for a in obj.setnull_set.all()])
models.signals.pre_delete.connect(pre_delete)
a = create_a("update_setnull")
a.setnull.delete()
a = create_a("update_cascade")
a.cascade.delete()
for obj in deleted:
self.assertIsNone(obj.pk)
for pk_list in related_setnull_sets:
for a in A.objects.filter(id__in=pk_list):
self.assertIsNone(a.setnull)
models.signals.pre_delete.disconnect(pre_delete)
def test_deletion_order(self):
pre_delete_order = []
post_delete_order = []
def log_post_delete(sender, **kwargs):
pre_delete_order.append((sender, kwargs["instance"].pk))
def log_pre_delete(sender, **kwargs):
post_delete_order.append((sender, kwargs["instance"].pk))
models.signals.post_delete.connect(log_post_delete)
models.signals.pre_delete.connect(log_pre_delete)
r = R.objects.create()
s1 = S.objects.create(r=r)
s2 = S.objects.create(r=r)
t1 = T.objects.create(s=s1)
t2 = T.objects.create(s=s2)
rchild = RChild.objects.create(r_ptr=r)
r_pk = r.pk
r.delete()
self.assertEqual(
pre_delete_order,
[
(T, t2.pk),
(T, t1.pk),
(RChild, rchild.pk),
(S, s2.pk),
(S, s1.pk),
(R, r_pk),
],
)
self.assertEqual(
post_delete_order,
[
(T, t1.pk),
(T, t2.pk),
(RChild, rchild.pk),
(S, s1.pk),
(S, s2.pk),
(R, r_pk),
],
)
models.signals.post_delete.disconnect(log_post_delete)
models.signals.pre_delete.disconnect(log_pre_delete)
def test_relational_post_delete_signals_happen_before_parent_object(self):
deletions = []
def log_post_delete(instance, **kwargs):
self.assertTrue(R.objects.filter(pk=instance.r_id))
self.assertIs(type(instance), S)
deletions.append(instance.id)
r = R.objects.create()
s = S.objects.create(r=r)
s_id = s.pk
models.signals.post_delete.connect(log_post_delete, sender=S)
try:
r.delete()
finally:
models.signals.post_delete.disconnect(log_post_delete)
self.assertEqual(len(deletions), 1)
self.assertEqual(deletions[0], s_id)
@skipUnlessDBFeature("can_defer_constraint_checks")
def test_can_defer_constraint_checks(self):
u = User.objects.create(avatar=Avatar.objects.create())
a = Avatar.objects.get(pk=u.avatar_id)
# 1 query to find the users for the avatar.
# 1 query to delete the user
# 1 query to delete the avatar
# The important thing is that when we can defer constraint checks there
# is no need to do an UPDATE on User.avatar to null it out.
# Attach a signal to make sure we will not do fast_deletes.
calls = []
def noop(*args, **kwargs):
calls.append("")
models.signals.post_delete.connect(noop, sender=User)
self.assertNumQueries(3, a.delete)
self.assertFalse(User.objects.exists())
self.assertFalse(Avatar.objects.exists())
self.assertEqual(len(calls), 1)
models.signals.post_delete.disconnect(noop, sender=User)
@skipIfDBFeature("can_defer_constraint_checks")
def test_cannot_defer_constraint_checks(self):
u = User.objects.create(avatar=Avatar.objects.create())
# Attach a signal to make sure we will not do fast_deletes.
calls = []
def noop(*args, **kwargs):
calls.append("")
models.signals.post_delete.connect(noop, sender=User)
a = Avatar.objects.get(pk=u.avatar_id)
# The below doesn't make sense... Why do we need to null out
# user.avatar if we are going to delete the user immediately after it,
# and there are no more cascades.
# 1 query to find the users for the avatar.
# 1 query to delete the user
# 1 query to null out user.avatar, because we can't defer the constraint
# 1 query to delete the avatar
self.assertNumQueries(4, a.delete)
self.assertFalse(User.objects.exists())
self.assertFalse(Avatar.objects.exists())
self.assertEqual(len(calls), 1)
models.signals.post_delete.disconnect(noop, sender=User)
def test_hidden_related(self):
r = R.objects.create()
h = HiddenUser.objects.create(r=r)
HiddenUserProfile.objects.create(user=h)
r.delete()
self.assertEqual(HiddenUserProfile.objects.count(), 0)
def test_large_delete(self):
TEST_SIZE = 2000
objs = [Avatar() for i in range(0, TEST_SIZE)]
Avatar.objects.bulk_create(objs)
# Calculate the number of queries needed.
batch_size = connection.ops.bulk_batch_size(["pk"], objs)
# The related fetches are done in batches.
batches = ceil(len(objs) / batch_size)
# One query for Avatar.objects.all() and then one related fast delete for
# each batch.
fetches_to_mem = 1 + batches
# The Avatar objects are going to be deleted in batches of
# GET_ITERATOR_CHUNK_SIZE.
queries = fetches_to_mem + TEST_SIZE // GET_ITERATOR_CHUNK_SIZE
self.assertNumQueries(queries, Avatar.objects.all().delete)
self.assertFalse(Avatar.objects.exists())
def test_large_delete_related(self):
TEST_SIZE = 2000
s = S.objects.create(r=R.objects.create())
for i in range(TEST_SIZE):
T.objects.create(s=s)
batch_size = max(connection.ops.bulk_batch_size(["pk"], range(TEST_SIZE)), 1)
# TEST_SIZE / batch_size (select related `T` instances)
# + 1 (select related `U` instances)
# + TEST_SIZE / GET_ITERATOR_CHUNK_SIZE (delete `T` instances in batches)
# + 1 (delete `s`)
expected_num_queries = ceil(TEST_SIZE / batch_size)
expected_num_queries += ceil(TEST_SIZE / GET_ITERATOR_CHUNK_SIZE) + 2
self.assertNumQueries(expected_num_queries, s.delete)
self.assertFalse(S.objects.exists())
self.assertFalse(T.objects.exists())
def test_delete_with_keeping_parents(self):
child = RChild.objects.create()
parent_id = child.r_ptr_id
child.delete(keep_parents=True)
self.assertFalse(RChild.objects.filter(id=child.id).exists())
self.assertTrue(R.objects.filter(id=parent_id).exists())
def test_delete_with_keeping_parents_relationships(self):
child = RChild.objects.create()
parent_id = child.r_ptr_id
parent_referent_id = S.objects.create(r=child.r_ptr).pk
child.delete(keep_parents=True)
self.assertFalse(RChild.objects.filter(id=child.id).exists())
self.assertTrue(R.objects.filter(id=parent_id).exists())
self.assertTrue(S.objects.filter(pk=parent_referent_id).exists())
childchild = RChildChild.objects.create()
parent_id = childchild.rchild_ptr.r_ptr_id
child_id = childchild.rchild_ptr_id
parent_referent_id = S.objects.create(r=childchild.rchild_ptr.r_ptr).pk
childchild.delete(keep_parents=True)
self.assertFalse(RChildChild.objects.filter(id=childchild.id).exists())
self.assertTrue(RChild.objects.filter(id=child_id).exists())
self.assertTrue(R.objects.filter(id=parent_id).exists())
self.assertTrue(S.objects.filter(pk=parent_referent_id).exists())
def test_queryset_delete_returns_num_rows(self):
"""
QuerySet.delete() should return the number of deleted rows and a
dictionary with the number of deletions for each object type.
"""
Avatar.objects.bulk_create(
[Avatar(desc="a"), Avatar(desc="b"), Avatar(desc="c")]
)
avatars_count = Avatar.objects.count()
deleted, rows_count = Avatar.objects.all().delete()
self.assertEqual(deleted, avatars_count)
# more complex example with multiple object types
r = R.objects.create()
h1 = HiddenUser.objects.create(r=r)
HiddenUser.objects.create(r=r)
HiddenUserProfile.objects.create(user=h1)
existed_objs = {
R._meta.label: R.objects.count(),
HiddenUser._meta.label: HiddenUser.objects.count(),
HiddenUserProfile._meta.label: HiddenUserProfile.objects.count(),
}
deleted, deleted_objs = R.objects.all().delete()
self.assertCountEqual(deleted_objs.keys(), existed_objs.keys())
for k, v in existed_objs.items():
self.assertEqual(deleted_objs[k], v)
def test_model_delete_returns_num_rows(self):
"""
Model.delete() should return the number of deleted rows and a
dictionary with the number of deletions for each object type.
"""
r = R.objects.create()
h1 = HiddenUser.objects.create(r=r)
h2 = HiddenUser.objects.create(r=r)
HiddenUser.objects.create(r=r)
HiddenUserProfile.objects.create(user=h1)
HiddenUserProfile.objects.create(user=h2)
m1 = M.objects.create()
m2 = M.objects.create()
MR.objects.create(r=r, m=m1)
r.m_set.add(m1)
r.m_set.add(m2)
r.save()
existed_objs = {
R._meta.label: R.objects.count(),
HiddenUser._meta.label: HiddenUser.objects.count(),
MR._meta.label: MR.objects.count(),
HiddenUserProfile._meta.label: HiddenUserProfile.objects.count(),
M.m2m.through._meta.label: M.m2m.through.objects.count(),
}
deleted, deleted_objs = r.delete()
self.assertEqual(deleted, sum(existed_objs.values()))
self.assertCountEqual(deleted_objs.keys(), existed_objs.keys())
for k, v in existed_objs.items():
self.assertEqual(deleted_objs[k], v)
def test_proxied_model_duplicate_queries(self):
"""
#25685 - Deleting instances of a model with existing proxy
classes should not issue multiple queries during cascade
deletion of referring models.
"""
avatar = Avatar.objects.create()
# One query for the Avatar table and a second for the User one.
with self.assertNumQueries(2):
avatar.delete()
def test_only_referenced_fields_selected(self):
"""
Only referenced fields are selected during cascade deletion SELECT
unless deletion signals are connected.
"""
origin = Origin.objects.create()
expected_sql = str(
Referrer.objects.only(
# Both fields are referenced by SecondReferrer.
"id",
"unique_field",
)
.filter(origin__in=[origin])
.query
)
with self.assertNumQueries(2) as ctx:
origin.delete()
self.assertEqual(ctx.captured_queries[0]["sql"], expected_sql)
def receiver(instance, **kwargs):
pass
# All fields are selected if deletion signals are connected.
for signal_name in ("pre_delete", "post_delete"):
with self.subTest(signal=signal_name):
origin = Origin.objects.create()
signal = getattr(models.signals, signal_name)
signal.connect(receiver, sender=Referrer)
with self.assertNumQueries(2) as ctx:
origin.delete()
self.assertIn(
connection.ops.quote_name("large_field"),
ctx.captured_queries[0]["sql"],
)
signal.disconnect(receiver, sender=Referrer)
class FastDeleteTests(TestCase):
def test_fast_delete_all(self):
with self.assertNumQueries(1) as ctx:
User.objects.all().delete()
sql = ctx.captured_queries[0]["sql"]
# No subqueries is used when performing a full delete.
self.assertNotIn("SELECT", sql)
def test_fast_delete_fk(self):
u = User.objects.create(avatar=Avatar.objects.create())
a = Avatar.objects.get(pk=u.avatar_id)
# 1 query to fast-delete the user
# 1 query to delete the avatar
self.assertNumQueries(2, a.delete)
self.assertFalse(User.objects.exists())
self.assertFalse(Avatar.objects.exists())
def test_fast_delete_m2m(self):
t = M2MTo.objects.create()
f = M2MFrom.objects.create()
f.m2m.add(t)
# 1 to delete f, 1 to fast-delete m2m for f
self.assertNumQueries(2, f.delete)
def test_fast_delete_revm2m(self):
t = M2MTo.objects.create()
f = M2MFrom.objects.create()
f.m2m.add(t)
# 1 to delete t, 1 to fast-delete t's m_set
self.assertNumQueries(2, f.delete)
def test_fast_delete_qs(self):
u1 = User.objects.create()
u2 = User.objects.create()
self.assertNumQueries(1, User.objects.filter(pk=u1.pk).delete)
self.assertEqual(User.objects.count(), 1)
self.assertTrue(User.objects.filter(pk=u2.pk).exists())
def test_fast_delete_instance_set_pk_none(self):
u = User.objects.create()
# User can be fast-deleted.
collector = Collector(using="default")
self.assertTrue(collector.can_fast_delete(u))
u.delete()
self.assertIsNone(u.pk)
def test_fast_delete_joined_qs(self):
a = Avatar.objects.create(desc="a")
User.objects.create(avatar=a)
u2 = User.objects.create()
self.assertNumQueries(1, User.objects.filter(avatar__desc="a").delete)
self.assertEqual(User.objects.count(), 1)
self.assertTrue(User.objects.filter(pk=u2.pk).exists())
def test_fast_delete_inheritance(self):
c = Child.objects.create()
p = Parent.objects.create()
# 1 for self, 1 for parent
self.assertNumQueries(2, c.delete)
self.assertFalse(Child.objects.exists())
self.assertEqual(Parent.objects.count(), 1)
self.assertEqual(Parent.objects.filter(pk=p.pk).count(), 1)
# 1 for self delete, 1 for fast delete of empty "child" qs.
self.assertNumQueries(2, p.delete)
self.assertFalse(Parent.objects.exists())
# 1 for self delete, 1 for fast delete of empty "child" qs.
c = Child.objects.create()
p = c.parent_ptr
self.assertNumQueries(2, p.delete)
self.assertFalse(Parent.objects.exists())
self.assertFalse(Child.objects.exists())
def test_fast_delete_large_batch(self):
User.objects.bulk_create(User() for i in range(0, 2000))
# No problems here - we aren't going to cascade, so we will fast
# delete the objects in a single query.
self.assertNumQueries(1, User.objects.all().delete)
a = Avatar.objects.create(desc="a")
User.objects.bulk_create(User(avatar=a) for i in range(0, 2000))
# We don't hit parameter amount limits for a, so just one query for
# that + fast delete of the related objs.
self.assertNumQueries(2, a.delete)
self.assertEqual(User.objects.count(), 0)
def test_fast_delete_empty_no_update_can_self_select(self):
"""
Fast deleting when DatabaseFeatures.update_can_self_select = False
works even if the specified filter doesn't match any row (#25932).
"""
with self.assertNumQueries(1):
self.assertEqual(
User.objects.filter(avatar__desc="missing").delete(),
(0, {}),
)
def test_fast_delete_combined_relationships(self):
# The cascading fast-delete of SecondReferrer should be combined
# in a single DELETE WHERE referrer_id OR unique_field.
origin = Origin.objects.create()
referer = Referrer.objects.create(origin=origin, unique_field=42)
with self.assertNumQueries(2):
referer.delete()
def test_fast_delete_aggregation(self):
# Fast-deleting when filtering against an aggregation result in
# a single query containing a subquery.
Base.objects.create()
with self.assertNumQueries(1):
self.assertEqual(
Base.objects.annotate(
rels_count=models.Count("rels"),
)
.filter(rels_count=0)
.delete(),
(1, {"delete.Base": 1}),
)
self.assertIs(Base.objects.exists(), False)
def test_fast_delete_full_match(self):
avatar = Avatar.objects.create(desc="bar")
User.objects.create(avatar=avatar)
with self.assertNumQueries(1):
User.objects.filter(~Q(pk__in=[]) | Q(avatar__desc="foo")).delete()
self.assertFalse(User.objects.exists())