Fixed #21169 -- Reworked RelatedManager methods use default filtering

The `remove()` and `clear()` methods of the related managers created by
`ForeignKey`, `GenericForeignKey`, and `ManyToManyField` suffered from a
number of issues. Some operations ran multiple data modifying queries without
wrapping them in a transaction, and some operations didn't respect default
filtering when it was present (i.e. when the default manager on the related
model implemented a custom `get_queryset()`).

Fixing the issues introduced some backward incompatible changes:

- The implementation of `remove()` for `ForeignKey` related managers changed
  from a series of `Model.save()` calls to a single `QuerySet.update()` call.
  The change means that `pre_save` and `post_save` signals aren't called anymore.

- The `remove()` and `clear()` methods for `GenericForeignKey` related
  managers now perform bulk delete so `Model.delete()` isn't called anymore.

- The `remove()` and `clear()` methods for `ManyToManyField` related
  managers perform nested queries when filtering is involved, which may
  or may not be an issue depending on the database and the data itself.

Refs. #3871, #21174.

Thanks Anssi Kääriäinen and Tim Graham for the reviews.
This commit is contained in:
Loic Bistuer 2013-09-27 06:35:53 +07:00 committed by Anssi Kääriäinen
parent 0b3c8fc851
commit 17c3997f68
5 changed files with 398 additions and 85 deletions

View File

@ -2,7 +2,7 @@ from operator import attrgetter
from django.db import connection, connections, router, transaction from django.db import connection, connections, router, transaction
from django.db.backends import utils from django.db.backends import utils
from django.db.models import signals from django.db.models import signals, Q
from django.db.models.fields import (AutoField, Field, IntegerField, from django.db.models.fields import (AutoField, Field, IntegerField,
PositiveIntegerField, PositiveSmallIntegerField, FieldDoesNotExist) PositiveIntegerField, PositiveSmallIntegerField, FieldDoesNotExist)
from django.db.models.related import RelatedObject, PathInfo from django.db.models.related import RelatedObject, PathInfo
@ -464,14 +464,21 @@ def create_foreign_related_manager(superclass, rel_field, rel_model):
# remove() and clear() are only provided if the ForeignKey can have a value of null. # remove() and clear() are only provided if the ForeignKey can have a value of null.
if rel_field.null: if rel_field.null:
def remove(self, *objs): def remove(self, *objs):
# If there aren't any objects, there is nothing to do.
if not objs:
return
val = rel_field.get_foreign_related_value(self.instance) val = rel_field.get_foreign_related_value(self.instance)
old_ids = set()
for obj in objs: for obj in objs:
# Is obj actually part of this descriptor set? # Is obj actually part of this descriptor set?
if rel_field.get_local_related_value(obj) == val: if rel_field.get_local_related_value(obj) == val:
setattr(obj, rel_field.name, None) old_ids.add(obj.pk)
obj.save()
else: else:
raise rel_field.rel.to.DoesNotExist("%r is not related to %r." % (obj, self.instance)) raise rel_field.rel.to.DoesNotExist("%r is not related to %r." % (obj, self.instance))
self.filter(pk__in=old_ids).update(**{rel_field.name: None})
remove.alters_data = True remove.alters_data = True
def clear(self): def clear(self):
@ -536,6 +543,7 @@ def create_many_related_manager(superclass, rel):
self.instance = instance self.instance = instance
self.symmetrical = symmetrical self.symmetrical = symmetrical
self.source_field = source_field self.source_field = source_field
self.target_field = through._meta.get_field(target_field_name)
self.source_field_name = source_field_name self.source_field_name = source_field_name
self.target_field_name = target_field_name self.target_field_name = target_field_name
self.reverse = reverse self.reverse = reverse
@ -572,6 +580,19 @@ def create_many_related_manager(superclass, rel):
) )
do_not_call_in_templates = True do_not_call_in_templates = True
def _build_clear_filters(self, qs):
filters = Q(**{
self.source_field_name: self.related_val,
'%s__in' % self.target_field_name: qs
})
if self.symmetrical:
filters |= Q(**{
self.target_field_name: self.related_val,
'%s__in' % self.source_field_name: qs
})
return filters
def get_queryset(self): def get_queryset(self):
try: try:
return self.instance._prefetched_objects_cache[self.prefetch_cache_name] return self.instance._prefetched_objects_cache[self.prefetch_cache_name]
@ -625,18 +646,20 @@ def create_many_related_manager(superclass, rel):
def remove(self, *objs): def remove(self, *objs):
self._remove_items(self.source_field_name, self.target_field_name, *objs) self._remove_items(self.source_field_name, self.target_field_name, *objs)
# If this is a symmetrical m2m relation to self, remove the mirror entry in the m2m table
if self.symmetrical:
self._remove_items(self.target_field_name, self.source_field_name, *objs)
remove.alters_data = True remove.alters_data = True
def clear(self): def clear(self):
self._clear_items(self.source_field_name) db = router.db_for_write(self.through, instance=self.instance)
# If this is a symmetrical m2m relation to self, clear the mirror entry in the m2m table signals.m2m_changed.send(sender=self.through, action="pre_clear",
if self.symmetrical: instance=self.instance, reverse=self.reverse,
self._clear_items(self.target_field_name) model=self.model, pk_set=None, using=db)
filters = self._build_clear_filters(self.using(db))
self.through._default_manager.using(db).filter(filters).delete()
signals.m2m_changed.send(sender=self.through, action="post_clear",
instance=self.instance, reverse=self.reverse,
model=self.model, pk_set=None, using=db)
clear.alters_data = True clear.alters_data = True
def create(self, **kwargs): def create(self, **kwargs):
@ -722,56 +745,34 @@ def create_many_related_manager(superclass, rel):
# *objs - objects to remove # *objs - objects to remove
# If there aren't any objects, there is nothing to do. # If there aren't any objects, there is nothing to do.
if objs: if not objs:
return
# Check that all the objects are of the right type # Check that all the objects are of the right type
old_ids = set() old_ids = set()
for obj in objs: for obj in objs:
if isinstance(obj, self.model): if isinstance(obj, self.model):
fk_val = self.through._meta.get_field( fk_val = self.target_field.get_foreign_related_value(obj)[0]
target_field_name).get_foreign_related_value(obj)[0]
old_ids.add(fk_val) old_ids.add(fk_val)
else: else:
old_ids.add(obj) old_ids.add(obj)
# Work out what DB we're operating on
db = router.db_for_write(self.through, instance=self.instance) db = router.db_for_write(self.through, instance=self.instance)
# Send a signal to the other end if need be. # Send a signal to the other end if need be.
if self.reverse or source_field_name == self.source_field_name:
# Don't send the signal when we are deleting the
# duplicate data row for symmetrical reverse entries.
signals.m2m_changed.send(sender=self.through, action="pre_remove", signals.m2m_changed.send(sender=self.through, action="pre_remove",
instance=self.instance, reverse=self.reverse, instance=self.instance, reverse=self.reverse,
model=self.model, pk_set=old_ids, using=db) model=self.model, pk_set=old_ids, using=db)
# Remove the specified objects from the join table
self.through._default_manager.using(db).filter(**{ old_vals_qs = self.using(db).filter(**{
source_field_name: self.related_val[0], '%s__in' % self.target_field.related_field.attname: old_ids})
'%s__in' % target_field_name: old_ids filters = self._build_clear_filters(old_vals_qs)
}).delete() self.through._default_manager.using(db).filter(filters).delete()
if self.reverse or source_field_name == self.source_field_name:
# Don't send the signal when we are deleting the
# duplicate data row for symmetrical reverse entries.
signals.m2m_changed.send(sender=self.through, action="post_remove", signals.m2m_changed.send(sender=self.through, action="post_remove",
instance=self.instance, reverse=self.reverse, instance=self.instance, reverse=self.reverse,
model=self.model, pk_set=old_ids, using=db) model=self.model, pk_set=old_ids, using=db)
def _clear_items(self, source_field_name):
db = router.db_for_write(self.through, instance=self.instance)
# source_field_name: the PK colname in join table for the source object
if self.reverse or source_field_name == self.source_field_name:
# Don't send the signal when we are clearing the
# duplicate data rows for symmetrical reverse entries.
signals.m2m_changed.send(sender=self.through, action="pre_clear",
instance=self.instance, reverse=self.reverse,
model=self.model, pk_set=None, using=db)
self.through._default_manager.using(db).filter(**{
source_field_name: self.related_val
}).delete()
if self.reverse or source_field_name == self.source_field_name:
# Don't send the signal when we are clearing the
# duplicate data rows for symmetrical reverse entries.
signals.m2m_changed.send(sender=self.through, action="post_clear",
instance=self.instance, reverse=self.reverse,
model=self.model, pk_set=None, using=db)
return ManyRelatedManager return ManyRelatedManager

View File

@ -2132,6 +2132,8 @@ extract two field values, where only one is expected::
inner_qs = Blog.objects.filter(name__contains='Ch').values('name', 'id') inner_qs = Blog.objects.filter(name__contains='Ch').values('name', 'id')
entries = Entry.objects.filter(blog__name__in=inner_qs) entries = Entry.objects.filter(blog__name__in=inner_qs)
.. _nested-queries-performance:
.. admonition:: Performance considerations .. admonition:: Performance considerations
Be cautious about using nested queries and understand your database Be cautious about using nested queries and understand your database

View File

@ -574,6 +574,32 @@ a :exc:`~exceptions.ValueError` when encountering them, you will have to
install pytz_. You may be affected by this problem if you use Django's time install pytz_. You may be affected by this problem if you use Django's time
zone-related date formats or :mod:`django.contrib.syndication`. zone-related date formats or :mod:`django.contrib.syndication`.
``remove()`` and ``clear()`` methods of related managers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ``remove()`` and ``clear()`` methods of the related managers created by
``ForeignKey``, ``GenericForeignKey``, and ``ManyToManyField`` suffered from a
number of issues. Some operations ran multiple data modifying queries without
wrapping them in a transaction, and some operations didn't respect default
filtering when it was present (i.e. when the default manager on the related
model implemented a custom ``get_queryset()``).
Fixing the issues introduced some backward incompatible changes:
- The default implementation of ``remove()`` for ``ForeignKey`` related managers
changed from a series of ``Model.save()`` calls to a single
``QuerySet.update()`` call. The change means that ``pre_save`` and
``post_save`` signals aren't sent anymore.
- The ``remove()`` and ``clear()`` methods for ``GenericForeignKey`` related
managers now perform bulk delete. The ``Model.delete()`` method isn't called
on each instance anymore.
- The ``remove()`` and ``clear()`` methods for ``ManyToManyField`` related
managers perform nested queries when filtering is involved, which may or
may not be an issue depending on your database and your data itself.
See :ref:`this note <nested-queries-performance>` for more details.
.. _pytz: https://pypi.python.org/pypi/pytz/ .. _pytz: https://pypi.python.org/pypi/pytz/
Miscellaneous Miscellaneous

View File

@ -101,6 +101,22 @@ class Person(models.Model):
return "%s %s" % (self.first_name, self.last_name) return "%s %s" % (self.first_name, self.last_name)
@python_2_unicode_compatible
class FunPerson(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
fun = models.BooleanField(default=True)
favorite_book = models.ForeignKey('Book', null=True, related_name='fun_people_favorite_books')
favorite_thing_type = models.ForeignKey('contenttypes.ContentType', null=True)
favorite_thing_id = models.IntegerField(null=True)
favorite_thing = generic.GenericForeignKey('favorite_thing_type', 'favorite_thing_id')
objects = FunPeopleManager()
def __str__(self):
return "%s %s" % (self.first_name, self.last_name)
@python_2_unicode_compatible @python_2_unicode_compatible
class Book(models.Model): class Book(models.Model):
title = models.CharField(max_length=50) title = models.CharField(max_length=50)
@ -108,10 +124,14 @@ class Book(models.Model):
is_published = models.BooleanField(default=False) is_published = models.BooleanField(default=False)
published_objects = PublishedBookManager() published_objects = PublishedBookManager()
authors = models.ManyToManyField(Person, related_name='books') authors = models.ManyToManyField(Person, related_name='books')
fun_authors = models.ManyToManyField(FunPerson, related_name='books')
favorite_things = generic.GenericRelation(Person, favorite_things = generic.GenericRelation(Person,
content_type_field='favorite_thing_type', object_id_field='favorite_thing_id') content_type_field='favorite_thing_type', object_id_field='favorite_thing_id')
fun_people_favorite_things = generic.GenericRelation(FunPerson,
content_type_field='favorite_thing_type', object_id_field='favorite_thing_id')
def __str__(self): def __str__(self):
return self.title return self.title

View File

@ -3,7 +3,7 @@ from __future__ import unicode_literals
from django.test import TestCase from django.test import TestCase
from django.utils import six from django.utils import six
from .models import Person, Book, Car, PersonManager, PublishedBookManager from .models import Person, FunPerson, Book, Car, PersonManager, PublishedBookManager
class CustomManagerTests(TestCase): class CustomManagerTests(TestCase):
@ -12,10 +12,11 @@ class CustomManagerTests(TestCase):
title="How to program", author="Rodney Dangerfield", is_published=True) title="How to program", author="Rodney Dangerfield", is_published=True)
self.b2 = Book.published_objects.create( self.b2 = Book.published_objects.create(
title="How to be smart", author="Albert Einstein", is_published=False) title="How to be smart", author="Albert Einstein", is_published=False)
self.p1 = Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.p2 = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False)
def test_manager(self): def test_manager(self):
Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False)
# Test a custom `Manager` method. # Test a custom `Manager` method.
self.assertQuerysetEqual( self.assertQuerysetEqual(
Person.objects.get_fun_people(), [ Person.objects.get_fun_people(), [
@ -66,7 +67,7 @@ class CustomManagerTests(TestCase):
# The RelatedManager used on the 'books' descriptor extends the default # The RelatedManager used on the 'books' descriptor extends the default
# manager # manager
self.assertIsInstance(self.p2.books, PublishedBookManager) self.assertIsInstance(droopy.books, PublishedBookManager)
# The default manager, "objects", doesn't exist, because a custom one # The default manager, "objects", doesn't exist, because a custom one
# was provided. # was provided.
@ -113,78 +114,341 @@ class CustomManagerTests(TestCase):
lambda c: c.name lambda c: c.name
) )
def test_related_manager_fk(self): def test_fk_related_manager(self):
self.p1.favorite_book = self.b1 Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1)
self.p1.save() Person.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1)
self.p2.favorite_book = self.b1 FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1)
self.p2.save() FunPerson.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1)
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.favorite_books.order_by('first_name').all(), [ self.b1.favorite_books.order_by('first_name').all(), [
"Bugs", "Bugs",
"Droopy", "Droopy",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.fun_people_favorite_books.all(), [
"Bugs",
],
lambda c: c.first_name,
ordered=False,
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.favorite_books(manager='boring_people').all(), [ self.b1.favorite_books(manager='boring_people').all(), [
"Droopy", "Droopy",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.favorite_books(manager='fun_people').all(), [ self.b1.favorite_books(manager='fun_people').all(), [
"Bugs", "Bugs",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
) )
def test_related_manager_gfk(self): def test_gfk_related_manager(self):
self.p1.favorite_thing = self.b1 Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1)
self.p1.save() Person.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1)
self.p2.favorite_thing = self.b1 FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1)
self.p2.save() FunPerson.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1)
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.favorite_things.order_by('first_name').all(), [ self.b1.favorite_things.all(), [
"Bugs", "Bugs",
"Droopy", "Droopy",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.fun_people_favorite_things.all(), [
"Bugs",
],
lambda c: c.first_name,
ordered=False,
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.favorite_things(manager='boring_people').all(), [ self.b1.favorite_things(manager='boring_people').all(), [
"Droopy", "Droopy",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.favorite_things(manager='fun_people').all(), [ self.b1.favorite_things(manager='fun_people').all(), [
"Bugs", "Bugs",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
) )
def test_related_manager_m2m(self): def test_m2m_related_manager(self):
self.b1.authors.add(self.p1) bugs = Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.authors.add(self.p2) self.b1.authors.add(bugs)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False)
self.b1.authors.add(droopy)
bugs = FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.fun_authors.add(bugs)
droopy = FunPerson.objects.create(first_name="Droopy", last_name="Dog", fun=False)
self.b1.fun_authors.add(droopy)
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.authors.order_by('first_name').all(), [ self.b1.authors.order_by('first_name').all(), [
"Bugs", "Bugs",
"Droopy", "Droopy",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.fun_authors.order_by('first_name').all(), [
"Bugs",
],
lambda c: c.first_name,
ordered=False,
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.authors(manager='boring_people').all(), [ self.b1.authors(manager='boring_people').all(), [
"Droopy", "Droopy",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
self.b1.authors(manager='fun_people').all(), [ self.b1.authors(manager='fun_people').all(), [
"Bugs", "Bugs",
], ],
lambda c: c.first_name lambda c: c.first_name,
ordered=False,
)
def test_removal_through_default_fk_related_manager(self):
bugs = FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1)
droopy = FunPerson.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1)
self.b1.fun_people_favorite_books.remove(droopy)
self.assertQuerysetEqual(
FunPerson._base_manager.filter(favorite_book=self.b1), [
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.b1.fun_people_favorite_books.remove(bugs)
self.assertQuerysetEqual(
FunPerson._base_manager.filter(favorite_book=self.b1), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
bugs.favorite_book = self.b1
bugs.save()
self.b1.fun_people_favorite_books.clear()
self.assertQuerysetEqual(
FunPerson._base_manager.filter(favorite_book=self.b1), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
def test_removal_through_specified_fk_related_manager(self):
Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1)
# Check that the fun manager DOESN'T remove boring people.
self.b1.favorite_books(manager='fun_people').remove(droopy)
self.assertQuerysetEqual(
self.b1.favorite_books(manager='boring_people').all(), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
# Check that the boring manager DOES remove boring people.
self.b1.favorite_books(manager='boring_people').remove(droopy)
self.assertQuerysetEqual(
self.b1.favorite_books(manager='boring_people').all(), [
],
lambda c: c.first_name,
ordered=False,
)
droopy.favorite_book = self.b1
droopy.save()
# Check that the fun manager ONLY clears fun people.
self.b1.favorite_books(manager='fun_people').clear()
self.assertQuerysetEqual(
self.b1.favorite_books(manager='boring_people').all(), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_books(manager='fun_people').all(), [
],
lambda c: c.first_name,
ordered=False,
)
def test_removal_through_default_gfk_related_manager(self):
bugs = FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1)
droopy = FunPerson.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1)
self.b1.fun_people_favorite_things.remove(droopy)
self.assertQuerysetEqual(
FunPerson._base_manager.order_by('first_name').filter(favorite_thing_id=self.b1.pk), [
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.b1.fun_people_favorite_things.remove(bugs)
self.assertQuerysetEqual(
FunPerson._base_manager.order_by('first_name').filter(favorite_thing_id=self.b1.pk), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
bugs.favorite_book = self.b1
bugs.save()
self.b1.fun_people_favorite_things.clear()
self.assertQuerysetEqual(
FunPerson._base_manager.order_by('first_name').filter(favorite_thing_id=self.b1.pk), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
def test_removal_through_specified_gfk_related_manager(self):
Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1)
# Check that the fun manager DOESN'T remove boring people.
self.b1.favorite_things(manager='fun_people').remove(droopy)
self.assertQuerysetEqual(
self.b1.favorite_things(manager='boring_people').all(), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
# Check that the boring manager DOES remove boring people.
self.b1.favorite_things(manager='boring_people').remove(droopy)
self.assertQuerysetEqual(
self.b1.favorite_things(manager='boring_people').all(), [
],
lambda c: c.first_name,
ordered=False,
)
droopy.favorite_thing = self.b1
droopy.save()
# Check that the fun manager ONLY clears fun people.
self.b1.favorite_things(manager='fun_people').clear()
self.assertQuerysetEqual(
self.b1.favorite_things(manager='boring_people').all(), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_things(manager='fun_people').all(), [
],
lambda c: c.first_name,
ordered=False,
)
def test_removal_through_default_m2m_related_manager(self):
bugs = FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.fun_authors.add(bugs)
droopy = FunPerson.objects.create(first_name="Droopy", last_name="Dog", fun=False)
self.b1.fun_authors.add(droopy)
self.b1.fun_authors.remove(droopy)
self.assertQuerysetEqual(
self.b1.fun_authors.through._default_manager.all(), [
"Bugs",
"Droopy",
],
lambda c: c.funperson.first_name,
ordered=False,
)
self.b1.fun_authors.remove(bugs)
self.assertQuerysetEqual(
self.b1.fun_authors.through._default_manager.all(), [
"Droopy",
],
lambda c: c.funperson.first_name,
ordered=False,
)
self.b1.fun_authors.add(bugs)
self.b1.fun_authors.clear()
self.assertQuerysetEqual(
self.b1.fun_authors.through._default_manager.all(), [
"Droopy",
],
lambda c: c.funperson.first_name,
ordered=False,
)
def test_removal_through_specified_m2m_related_manager(self):
bugs = Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.authors.add(bugs)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False)
self.b1.authors.add(droopy)
# Check that the fun manager DOESN'T remove boring people.
self.b1.authors(manager='fun_people').remove(droopy)
self.assertQuerysetEqual(
self.b1.authors(manager='boring_people').all(), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
# Check that the boring manager DOES remove boring people.
self.b1.authors(manager='boring_people').remove(droopy)
self.assertQuerysetEqual(
self.b1.authors(manager='boring_people').all(), [
],
lambda c: c.first_name,
ordered=False,
)
self.b1.authors.add(droopy)
# Check that the fun manager ONLY clears fun people.
self.b1.authors(manager='fun_people').clear()
self.assertQuerysetEqual(
self.b1.authors(manager='boring_people').all(), [
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.authors(manager='fun_people').all(), [
],
lambda c: c.first_name,
ordered=False,
) )