django/tests/custom_managers/tests.py

859 lines
30 KiB
Python

from django.db import models
from django.test import TestCase
from .models import (
Book,
Car,
ConfusedBook,
CustomManager,
CustomQuerySet,
DeconstructibleCustomManager,
FastCarAsBase,
FastCarAsDefault,
FunPerson,
OneToOneRestrictedModel,
Person,
PersonFromAbstract,
PersonManager,
PublishedBookManager,
RelatedModel,
RestrictedModel,
)
class CustomManagerTests(TestCase):
custom_manager_names = [
"custom_queryset_default_manager",
"custom_queryset_custom_manager",
]
@classmethod
def setUpTestData(cls):
cls.b1 = Book.published_objects.create(
title="How to program", author="Rodney Dangerfield", is_published=True
)
cls.b2 = Book.published_objects.create(
title="How to be smart", author="Albert Einstein", is_published=False
)
cls.p1 = Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
cls.droopy = Person.objects.create(
first_name="Droopy", last_name="Dog", fun=False
)
def test_custom_manager_basic(self):
"""
Test a custom Manager method.
"""
self.assertQuerysetEqual(Person.objects.get_fun_people(), ["Bugs Bunny"], str)
def test_queryset_copied_to_default(self):
"""
The methods of a custom QuerySet are properly copied onto the
default Manager.
"""
for manager_name in self.custom_manager_names:
with self.subTest(manager_name=manager_name):
manager = getattr(Person, manager_name)
# Public methods are copied
manager.public_method()
# Private methods are not copied
with self.assertRaises(AttributeError):
manager._private_method()
def test_manager_honors_queryset_only(self):
for manager_name in self.custom_manager_names:
with self.subTest(manager_name=manager_name):
manager = getattr(Person, manager_name)
# Methods with queryset_only=False are copied even if they are private.
manager._optin_private_method()
# Methods with queryset_only=True aren't copied even if they are public.
msg = (
"%r object has no attribute 'optout_public_method'"
% manager.__class__.__name__
)
with self.assertRaisesMessage(AttributeError, msg):
manager.optout_public_method()
def test_manager_use_queryset_methods(self):
"""
Custom manager will use the queryset methods
"""
for manager_name in self.custom_manager_names:
with self.subTest(manager_name=manager_name):
manager = getattr(Person, manager_name)
queryset = manager.filter()
self.assertQuerysetEqual(queryset, ["Bugs Bunny"], str)
self.assertIs(queryset._filter_CustomQuerySet, True)
# Specialized querysets inherit from our custom queryset.
queryset = manager.values_list("first_name", flat=True).filter()
self.assertEqual(list(queryset), ["Bugs"])
self.assertIs(queryset._filter_CustomQuerySet, True)
self.assertIsInstance(queryset.values(), CustomQuerySet)
self.assertIsInstance(queryset.values().values(), CustomQuerySet)
self.assertIsInstance(queryset.values_list().values(), CustomQuerySet)
def test_init_args(self):
"""
The custom manager __init__() argument has been set.
"""
self.assertEqual(Person.custom_queryset_custom_manager.init_arg, "hello")
def test_manager_attributes(self):
"""
Custom manager method is only available on the manager and not on
querysets.
"""
Person.custom_queryset_custom_manager.manager_only()
msg = "'CustomQuerySet' object has no attribute 'manager_only'"
with self.assertRaisesMessage(AttributeError, msg):
Person.custom_queryset_custom_manager.all().manager_only()
def test_queryset_and_manager(self):
"""
Queryset method doesn't override the custom manager method.
"""
queryset = Person.custom_queryset_custom_manager.filter()
self.assertQuerysetEqual(queryset, ["Bugs Bunny"], str)
self.assertIs(queryset._filter_CustomManager, True)
def test_related_manager(self):
"""
The related managers extend the default manager.
"""
self.assertIsInstance(self.droopy.books, PublishedBookManager)
self.assertIsInstance(self.b2.authors, PersonManager)
def test_no_objects(self):
"""
The default manager, "objects", doesn't exist, because a custom one
was provided.
"""
msg = "type object 'Book' has no attribute 'objects'"
with self.assertRaisesMessage(AttributeError, msg):
Book.objects
def test_filtering(self):
"""
Custom managers respond to usual filtering methods
"""
self.assertQuerysetEqual(
Book.published_objects.all(),
[
"How to program",
],
lambda b: b.title,
)
def test_fk_related_manager(self):
Person.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1
)
Person.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1
)
FunPerson.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1
)
FunPerson.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1
)
self.assertQuerysetEqual(
self.b1.favorite_books.order_by("first_name").all(),
[
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.fun_people_favorite_books.all(),
[
"Bugs",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_books(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_books(manager="fun_people").all(),
[
"Bugs",
],
lambda c: c.first_name,
ordered=False,
)
def test_fk_related_manager_reused(self):
self.assertIs(self.b1.favorite_books, self.b1.favorite_books)
self.assertIn("favorite_books", self.b1._state.related_managers_cache)
def test_gfk_related_manager(self):
Person.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1
)
Person.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1
)
FunPerson.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1
)
FunPerson.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1
)
self.assertQuerysetEqual(
self.b1.favorite_things.all(),
[
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.fun_people_favorite_things.all(),
[
"Bugs",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_things(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_things(manager="fun_people").all(),
[
"Bugs",
],
lambda c: c.first_name,
ordered=False,
)
def test_gfk_related_manager_reused(self):
self.assertIs(
self.b1.fun_people_favorite_things,
self.b1.fun_people_favorite_things,
)
self.assertIn(
"fun_people_favorite_things",
self.b1._state.related_managers_cache,
)
def test_gfk_related_manager_not_reused_when_alternate(self):
self.assertIsNot(
self.b1.favorite_things(manager="fun_people"),
self.b1.favorite_things(manager="fun_people"),
)
def test_gfk_related_manager_no_overlap_when_not_hidden(self):
"""
If a GenericRelation defines a related_query_name (and thus the
related_name) which shadows another GenericRelation, it should not
cause those separate managers to clash.
"""
book = ConfusedBook.objects.create(
title="How to program",
author="Rodney Dangerfield",
)
person = Person.objects.create(
first_name="Bugs",
last_name="Bunny",
fun=True,
favorite_thing=book,
)
fun_person = FunPerson.objects.create(
first_name="Droopy",
last_name="Dog",
fun=False,
favorite_thing=book,
)
# The managers don't collide in the internal cache.
self.assertIsNot(book.favorite_things, book.less_favorite_things)
self.assertIs(book.favorite_things, book.favorite_things)
self.assertIs(book.less_favorite_things, book.less_favorite_things)
# Both managers are cached separately despite the collision in names.
self.assertIn("favorite_things", book._state.related_managers_cache)
self.assertIn("less_favorite_things", book._state.related_managers_cache)
# "less_favorite_things" isn't available as a reverse related manager,
# so never ends up in the cache.
self.assertQuerysetEqual(fun_person.favorite_things.all(), [book])
with self.assertRaises(AttributeError):
fun_person.less_favorite_things
self.assertIn("favorite_things", fun_person._state.related_managers_cache)
self.assertNotIn(
"less_favorite_things",
fun_person._state.related_managers_cache,
)
# The GenericRelation doesn't exist for Person, only FunPerson, so the
# exception prevents the cache from being polluted.
with self.assertRaises(AttributeError):
person.favorite_things
self.assertNotIn("favorite_things", person._state.related_managers_cache)
def test_m2m_related_manager(self):
bugs = Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.authors.add(bugs)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False)
self.b1.authors.add(droopy)
bugs = FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.fun_authors.add(bugs)
droopy = FunPerson.objects.create(
first_name="Droopy", last_name="Dog", fun=False
)
self.b1.fun_authors.add(droopy)
self.assertQuerysetEqual(
self.b1.authors.order_by("first_name").all(),
[
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.fun_authors.order_by("first_name").all(),
[
"Bugs",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.authors(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.authors(manager="fun_people").all(),
[
"Bugs",
],
lambda c: c.first_name,
ordered=False,
)
def test_m2m_related_forward_manager_reused(self):
self.assertIs(self.b1.authors, self.b1.authors)
self.assertIn("authors", self.b1._state.related_managers_cache)
def test_m2m_related_revers_manager_reused(self):
bugs = Person.objects.create(first_name="Bugs", last_name="Bunny")
self.b1.authors.add(bugs)
self.assertIs(bugs.books, bugs.books)
self.assertIn("books", bugs._state.related_managers_cache)
def test_removal_through_default_fk_related_manager(self, bulk=True):
bugs = FunPerson.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1
)
droopy = FunPerson.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1
)
self.b1.fun_people_favorite_books.remove(droopy, bulk=bulk)
self.assertQuerysetEqual(
FunPerson._base_manager.filter(favorite_book=self.b1),
[
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.b1.fun_people_favorite_books.remove(bugs, bulk=bulk)
self.assertQuerysetEqual(
FunPerson._base_manager.filter(favorite_book=self.b1),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
bugs.favorite_book = self.b1
bugs.save()
self.b1.fun_people_favorite_books.clear(bulk=bulk)
self.assertQuerysetEqual(
FunPerson._base_manager.filter(favorite_book=self.b1),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
def test_slow_removal_through_default_fk_related_manager(self):
self.test_removal_through_default_fk_related_manager(bulk=False)
def test_removal_through_specified_fk_related_manager(self, bulk=True):
Person.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_book=self.b1
)
droopy = Person.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_book=self.b1
)
# The fun manager DOESN'T remove boring people.
self.b1.favorite_books(manager="fun_people").remove(droopy, bulk=bulk)
self.assertQuerysetEqual(
self.b1.favorite_books(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
# The boring manager DOES remove boring people.
self.b1.favorite_books(manager="boring_people").remove(droopy, bulk=bulk)
self.assertQuerysetEqual(
self.b1.favorite_books(manager="boring_people").all(),
[],
lambda c: c.first_name,
ordered=False,
)
droopy.favorite_book = self.b1
droopy.save()
# The fun manager ONLY clears fun people.
self.b1.favorite_books(manager="fun_people").clear(bulk=bulk)
self.assertQuerysetEqual(
self.b1.favorite_books(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_books(manager="fun_people").all(),
[],
lambda c: c.first_name,
ordered=False,
)
def test_slow_removal_through_specified_fk_related_manager(self):
self.test_removal_through_specified_fk_related_manager(bulk=False)
def test_removal_through_default_gfk_related_manager(self, bulk=True):
bugs = FunPerson.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1
)
droopy = FunPerson.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1
)
self.b1.fun_people_favorite_things.remove(droopy, bulk=bulk)
self.assertQuerysetEqual(
FunPerson._base_manager.order_by("first_name").filter(
favorite_thing_id=self.b1.pk
),
[
"Bugs",
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.b1.fun_people_favorite_things.remove(bugs, bulk=bulk)
self.assertQuerysetEqual(
FunPerson._base_manager.order_by("first_name").filter(
favorite_thing_id=self.b1.pk
),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
bugs.favorite_book = self.b1
bugs.save()
self.b1.fun_people_favorite_things.clear(bulk=bulk)
self.assertQuerysetEqual(
FunPerson._base_manager.order_by("first_name").filter(
favorite_thing_id=self.b1.pk
),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
def test_slow_removal_through_default_gfk_related_manager(self):
self.test_removal_through_default_gfk_related_manager(bulk=False)
def test_removal_through_specified_gfk_related_manager(self, bulk=True):
Person.objects.create(
first_name="Bugs", last_name="Bunny", fun=True, favorite_thing=self.b1
)
droopy = Person.objects.create(
first_name="Droopy", last_name="Dog", fun=False, favorite_thing=self.b1
)
# The fun manager DOESN'T remove boring people.
self.b1.favorite_things(manager="fun_people").remove(droopy, bulk=bulk)
self.assertQuerysetEqual(
self.b1.favorite_things(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
# The boring manager DOES remove boring people.
self.b1.favorite_things(manager="boring_people").remove(droopy, bulk=bulk)
self.assertQuerysetEqual(
self.b1.favorite_things(manager="boring_people").all(),
[],
lambda c: c.first_name,
ordered=False,
)
droopy.favorite_thing = self.b1
droopy.save()
# The fun manager ONLY clears fun people.
self.b1.favorite_things(manager="fun_people").clear(bulk=bulk)
self.assertQuerysetEqual(
self.b1.favorite_things(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.favorite_things(manager="fun_people").all(),
[],
lambda c: c.first_name,
ordered=False,
)
def test_slow_removal_through_specified_gfk_related_manager(self):
self.test_removal_through_specified_gfk_related_manager(bulk=False)
def test_removal_through_default_m2m_related_manager(self):
bugs = FunPerson.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.fun_authors.add(bugs)
droopy = FunPerson.objects.create(
first_name="Droopy", last_name="Dog", fun=False
)
self.b1.fun_authors.add(droopy)
self.b1.fun_authors.remove(droopy)
self.assertQuerysetEqual(
self.b1.fun_authors.through._default_manager.all(),
[
"Bugs",
"Droopy",
],
lambda c: c.funperson.first_name,
ordered=False,
)
self.b1.fun_authors.remove(bugs)
self.assertQuerysetEqual(
self.b1.fun_authors.through._default_manager.all(),
[
"Droopy",
],
lambda c: c.funperson.first_name,
ordered=False,
)
self.b1.fun_authors.add(bugs)
self.b1.fun_authors.clear()
self.assertQuerysetEqual(
self.b1.fun_authors.through._default_manager.all(),
[
"Droopy",
],
lambda c: c.funperson.first_name,
ordered=False,
)
def test_removal_through_specified_m2m_related_manager(self):
bugs = Person.objects.create(first_name="Bugs", last_name="Bunny", fun=True)
self.b1.authors.add(bugs)
droopy = Person.objects.create(first_name="Droopy", last_name="Dog", fun=False)
self.b1.authors.add(droopy)
# The fun manager DOESN'T remove boring people.
self.b1.authors(manager="fun_people").remove(droopy)
self.assertQuerysetEqual(
self.b1.authors(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
# The boring manager DOES remove boring people.
self.b1.authors(manager="boring_people").remove(droopy)
self.assertQuerysetEqual(
self.b1.authors(manager="boring_people").all(),
[],
lambda c: c.first_name,
ordered=False,
)
self.b1.authors.add(droopy)
# The fun manager ONLY clears fun people.
self.b1.authors(manager="fun_people").clear()
self.assertQuerysetEqual(
self.b1.authors(manager="boring_people").all(),
[
"Droopy",
],
lambda c: c.first_name,
ordered=False,
)
self.assertQuerysetEqual(
self.b1.authors(manager="fun_people").all(),
[],
lambda c: c.first_name,
ordered=False,
)
def test_deconstruct_default(self):
mgr = models.Manager()
as_manager, mgr_path, qs_path, args, kwargs = mgr.deconstruct()
self.assertFalse(as_manager)
self.assertEqual(mgr_path, "django.db.models.manager.Manager")
self.assertEqual(args, ())
self.assertEqual(kwargs, {})
def test_deconstruct_as_manager(self):
mgr = CustomQuerySet.as_manager()
as_manager, mgr_path, qs_path, args, kwargs = mgr.deconstruct()
self.assertTrue(as_manager)
self.assertEqual(qs_path, "custom_managers.models.CustomQuerySet")
def test_deconstruct_from_queryset(self):
mgr = DeconstructibleCustomManager("a", "b")
as_manager, mgr_path, qs_path, args, kwargs = mgr.deconstruct()
self.assertFalse(as_manager)
self.assertEqual(
mgr_path, "custom_managers.models.DeconstructibleCustomManager"
)
self.assertEqual(
args,
(
"a",
"b",
),
)
self.assertEqual(kwargs, {})
mgr = DeconstructibleCustomManager("x", "y", c=3, d=4)
as_manager, mgr_path, qs_path, args, kwargs = mgr.deconstruct()
self.assertFalse(as_manager)
self.assertEqual(
mgr_path, "custom_managers.models.DeconstructibleCustomManager"
)
self.assertEqual(
args,
(
"x",
"y",
),
)
self.assertEqual(kwargs, {"c": 3, "d": 4})
def test_deconstruct_from_queryset_failing(self):
mgr = CustomManager("arg")
msg = (
"Could not find manager BaseCustomManagerFromCustomQuerySet in "
"django.db.models.manager.\n"
"Please note that you need to inherit from managers you "
"dynamically generated with 'from_queryset()'."
)
with self.assertRaisesMessage(ValueError, msg):
mgr.deconstruct()
def test_abstract_model_with_custom_manager_name(self):
"""
A custom manager may be defined on an abstract model.
It will be inherited by the abstract model's children.
"""
PersonFromAbstract.abstract_persons.create(objects="Test")
self.assertQuerysetEqual(
PersonFromAbstract.abstract_persons.all(),
["Test"],
lambda c: c.objects,
)
class TestCars(TestCase):
def test_managers(self):
# Each model class gets a "_default_manager" attribute, which is a
# reference to the first manager defined in the class.
Car.cars.create(name="Corvette", mileage=21, top_speed=180)
Car.cars.create(name="Neon", mileage=31, top_speed=100)
self.assertQuerysetEqual(
Car._default_manager.order_by("name"),
[
"Corvette",
"Neon",
],
lambda c: c.name,
)
self.assertQuerysetEqual(
Car.cars.order_by("name"),
[
"Corvette",
"Neon",
],
lambda c: c.name,
)
# alternate manager
self.assertQuerysetEqual(
Car.fast_cars.all(),
[
"Corvette",
],
lambda c: c.name,
)
# explicit default manager
self.assertQuerysetEqual(
FastCarAsDefault.cars.order_by("name"),
[
"Corvette",
"Neon",
],
lambda c: c.name,
)
self.assertQuerysetEqual(
FastCarAsDefault._default_manager.all(),
[
"Corvette",
],
lambda c: c.name,
)
# explicit base manager
self.assertQuerysetEqual(
FastCarAsBase.cars.order_by("name"),
[
"Corvette",
"Neon",
],
lambda c: c.name,
)
self.assertQuerysetEqual(
FastCarAsBase._base_manager.all(),
[
"Corvette",
],
lambda c: c.name,
)
class CustomManagersRegressTestCase(TestCase):
def test_filtered_default_manager(self):
"""Even though the default manager filters out some records,
we must still be able to save (particularly, save by updating
existing records) those filtered instances. This is a
regression test for #8990, #9527"""
related = RelatedModel.objects.create(name="xyzzy")
obj = RestrictedModel.objects.create(name="hidden", related=related)
obj.name = "still hidden"
obj.save()
# If the hidden object wasn't seen during the save process,
# there would now be two objects in the database.
self.assertEqual(RestrictedModel.plain_manager.count(), 1)
def test_refresh_from_db_when_default_manager_filters(self):
"""
Model.refresh_from_db() works for instances hidden by the default
manager.
"""
book = Book._base_manager.create(is_published=False)
Book._base_manager.filter(pk=book.pk).update(title="Hi")
book.refresh_from_db()
self.assertEqual(book.title, "Hi")
def test_save_clears_annotations_from_base_manager(self):
"""Model.save() clears annotations from the base manager."""
self.assertEqual(Book._meta.base_manager.name, "annotated_objects")
book = Book.annotated_objects.create(title="Hunting")
Person.objects.create(
first_name="Bugs",
last_name="Bunny",
fun=True,
favorite_book=book,
favorite_thing_id=1,
)
book = Book.annotated_objects.first()
self.assertEqual(book.favorite_avg, 1) # Annotation from the manager.
book.title = "New Hunting"
# save() fails if annotations that involve related fields aren't
# cleared before the update query.
book.save()
self.assertEqual(Book.annotated_objects.first().title, "New Hunting")
def test_delete_related_on_filtered_manager(self):
"""Deleting related objects should also not be distracted by a
restricted manager on the related object. This is a regression
test for #2698."""
related = RelatedModel.objects.create(name="xyzzy")
for name, public in (("one", True), ("two", False), ("three", False)):
RestrictedModel.objects.create(name=name, is_public=public, related=related)
obj = RelatedModel.objects.get(name="xyzzy")
obj.delete()
# All of the RestrictedModel instances should have been
# deleted, since they *all* pointed to the RelatedModel. If
# the default manager is used, only the public one will be
# deleted.
self.assertEqual(len(RestrictedModel.plain_manager.all()), 0)
def test_delete_one_to_one_manager(self):
# The same test case as the last one, but for one-to-one
# models, which are implemented slightly different internally,
# so it's a different code path.
obj = RelatedModel.objects.create(name="xyzzy")
OneToOneRestrictedModel.objects.create(name="foo", is_public=False, related=obj)
obj = RelatedModel.objects.get(name="xyzzy")
obj.delete()
self.assertEqual(len(OneToOneRestrictedModel.plain_manager.all()), 0)
def test_queryset_with_custom_init(self):
"""
BaseManager.get_queryset() should use kwargs rather than args to allow
custom kwargs (#24911).
"""
qs_custom = Person.custom_init_queryset_manager.all()
qs_default = Person.objects.all()
self.assertQuerysetEqual(qs_custom, qs_default)