import copy import datetime import pickle from operator import attrgetter from django.core.exceptions import FieldError from django.db import models from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature from django.test.utils import isolate_apps from django.utils import translation from .models import ( Article, ArticleIdea, ArticleTag, ArticleTranslation, Country, Friendship, Group, Membership, NewsArticle, Person, ) # Note that these tests are testing internal implementation details. # ForeignObject is not part of public API. class MultiColumnFKTests(TestCase): @classmethod def setUpTestData(cls): # Creating countries cls.usa = Country.objects.create(name="United States of America") cls.soviet_union = Country.objects.create(name="Soviet Union") # Creating People cls.bob = Person.objects.create(name='Bob', person_country=cls.usa) cls.jim = Person.objects.create(name='Jim', person_country=cls.usa) cls.george = Person.objects.create(name='George', person_country=cls.usa) cls.jane = Person.objects.create(name='Jane', person_country=cls.soviet_union) cls.mark = Person.objects.create(name='Mark', person_country=cls.soviet_union) cls.sam = Person.objects.create(name='Sam', person_country=cls.soviet_union) # Creating Groups cls.kgb = Group.objects.create(name='KGB', group_country=cls.soviet_union) cls.cia = Group.objects.create(name='CIA', group_country=cls.usa) cls.republican = Group.objects.create(name='Republican', group_country=cls.usa) cls.democrat = Group.objects.create(name='Democrat', group_country=cls.usa) def test_get_succeeds_on_multicolumn_match(self): # Membership objects have access to their related Person if both # country_ids match between them membership = Membership.objects.create( membership_country_id=self.usa.id, person_id=self.bob.id, group_id=self.cia.id) person = membership.person self.assertEqual((person.id, person.name), (self.bob.id, "Bob")) def test_get_fails_on_multicolumn_mismatch(self): # Membership objects returns DoesNotExist error when there is no # Person with the same id and country_id membership = Membership.objects.create( membership_country_id=self.usa.id, person_id=self.jane.id, group_id=self.cia.id) with self.assertRaises(Person.DoesNotExist): getattr(membership, 'person') def test_reverse_query_returns_correct_result(self): # Creating a valid membership because it has the same country has the person Membership.objects.create( membership_country_id=self.usa.id, person_id=self.bob.id, group_id=self.cia.id) # Creating an invalid membership because it has a different country has the person Membership.objects.create( membership_country_id=self.soviet_union.id, person_id=self.bob.id, group_id=self.republican.id) with self.assertNumQueries(1): membership = self.bob.membership_set.get() self.assertEqual(membership.group_id, self.cia.id) self.assertIs(membership.person, self.bob) def test_query_filters_correctly(self): # Creating a to valid memberships Membership.objects.create( membership_country_id=self.usa.id, person_id=self.bob.id, group_id=self.cia.id) Membership.objects.create( membership_country_id=self.usa.id, person_id=self.jim.id, group_id=self.cia.id) # Creating an invalid membership Membership.objects.create(membership_country_id=self.soviet_union.id, person_id=self.george.id, group_id=self.cia.id) self.assertQuerysetEqual( Membership.objects.filter(person__name__contains='o'), [ self.bob.id ], attrgetter("person_id") ) def test_reverse_query_filters_correctly(self): timemark = datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None) timedelta = datetime.timedelta(days=1) # Creating a to valid memberships Membership.objects.create( membership_country_id=self.usa.id, person_id=self.bob.id, group_id=self.cia.id, date_joined=timemark - timedelta) Membership.objects.create( membership_country_id=self.usa.id, person_id=self.jim.id, group_id=self.cia.id, date_joined=timemark + timedelta) # Creating an invalid membership Membership.objects.create( membership_country_id=self.soviet_union.id, person_id=self.george.id, group_id=self.cia.id, date_joined=timemark + timedelta) self.assertQuerysetEqual( Person.objects.filter(membership__date_joined__gte=timemark), [ 'Jim' ], attrgetter('name') ) def test_forward_in_lookup_filters_correctly(self): Membership.objects.create(membership_country_id=self.usa.id, person_id=self.bob.id, group_id=self.cia.id) Membership.objects.create(membership_country_id=self.usa.id, person_id=self.jim.id, group_id=self.cia.id) # Creating an invalid membership Membership.objects.create( membership_country_id=self.soviet_union.id, person_id=self.george.id, group_id=self.cia.id) self.assertQuerysetEqual( Membership.objects.filter(person__in=[self.george, self.jim]), [ self.jim.id, ], attrgetter('person_id') ) self.assertQuerysetEqual( Membership.objects.filter(person__in=Person.objects.filter(name='Jim')), [ self.jim.id, ], attrgetter('person_id') ) def test_double_nested_query(self): m1 = Membership.objects.create(membership_country_id=self.usa.id, person_id=self.bob.id, group_id=self.cia.id) m2 = Membership.objects.create(membership_country_id=self.usa.id, person_id=self.jim.id, group_id=self.cia.id) Friendship.objects.create(from_friend_country_id=self.usa.id, from_friend_id=self.bob.id, to_friend_country_id=self.usa.id, to_friend_id=self.jim.id) self.assertSequenceEqual( Membership.objects.filter( person__in=Person.objects.filter( from_friend__in=Friendship.objects.filter(to_friend__in=Person.objects.all()) ) ), [m1] ) self.assertSequenceEqual( Membership.objects.exclude( person__in=Person.objects.filter( from_friend__in=Friendship.objects.filter(to_friend__in=Person.objects.all()) ) ), [m2] ) def test_select_related_foreignkey_forward_works(self): Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.jim, group=self.democrat) with self.assertNumQueries(1): people = [m.person for m in Membership.objects.select_related('person').order_by('pk')] normal_people = [m.person for m in Membership.objects.all().order_by('pk')] self.assertEqual(people, normal_people) def test_prefetch_foreignkey_forward_works(self): Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.jim, group=self.democrat) with self.assertNumQueries(2): people = [ m.person for m in Membership.objects.prefetch_related('person').order_by('pk')] normal_people = [m.person for m in Membership.objects.order_by('pk')] self.assertEqual(people, normal_people) def test_prefetch_foreignkey_reverse_works(self): Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.jim, group=self.democrat) with self.assertNumQueries(2): membership_sets = [ list(p.membership_set.all()) for p in Person.objects.prefetch_related('membership_set').order_by('pk')] with self.assertNumQueries(7): normal_membership_sets = [ list(p.membership_set.all()) for p in Person.objects.order_by('pk') ] self.assertEqual(membership_sets, normal_membership_sets) def test_m2m_through_forward_returns_valid_members(self): # We start out by making sure that the Group 'CIA' has no members. self.assertQuerysetEqual( self.cia.members.all(), [] ) Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.jim, group=self.cia) # Let's check to make sure that it worked. Bob and Jim should be members of the CIA. self.assertQuerysetEqual( self.cia.members.all(), [ 'Bob', 'Jim' ], attrgetter("name") ) def test_m2m_through_reverse_returns_valid_members(self): # We start out by making sure that Bob is in no groups. self.assertQuerysetEqual( self.bob.groups.all(), [] ) Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.republican) # Bob should be in the CIA and a Republican self.assertQuerysetEqual( self.bob.groups.all(), [ 'CIA', 'Republican' ], attrgetter("name") ) def test_m2m_through_forward_ignores_invalid_members(self): # We start out by making sure that the Group 'CIA' has no members. self.assertQuerysetEqual( self.cia.members.all(), [] ) # Something adds jane to group CIA but Jane is in Soviet Union which isn't CIA's country Membership.objects.create(membership_country=self.usa, person=self.jane, group=self.cia) # There should still be no members in CIA self.assertQuerysetEqual( self.cia.members.all(), [] ) def test_m2m_through_reverse_ignores_invalid_members(self): # We start out by making sure that Jane has no groups. self.assertQuerysetEqual( self.jane.groups.all(), [] ) # Something adds jane to group CIA but Jane is in Soviet Union which isn't CIA's country Membership.objects.create(membership_country=self.usa, person=self.jane, group=self.cia) # Jane should still not be in any groups self.assertQuerysetEqual( self.jane.groups.all(), [] ) def test_m2m_through_on_self_works(self): self.assertQuerysetEqual( self.jane.friends.all(), [] ) Friendship.objects.create( from_friend_country=self.jane.person_country, from_friend=self.jane, to_friend_country=self.george.person_country, to_friend=self.george) self.assertQuerysetEqual( self.jane.friends.all(), ['George'], attrgetter("name") ) def test_m2m_through_on_self_ignores_mismatch_columns(self): self.assertQuerysetEqual(self.jane.friends.all(), []) # Note that we use ids instead of instances. This is because instances on ForeignObject # properties will set all related field off of the given instance Friendship.objects.create( from_friend_id=self.jane.id, to_friend_id=self.george.id, to_friend_country_id=self.jane.person_country_id, from_friend_country_id=self.george.person_country_id) self.assertQuerysetEqual(self.jane.friends.all(), []) def test_prefetch_related_m2m_forward_works(self): Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.jim, group=self.democrat) with self.assertNumQueries(2): members_lists = [list(g.members.all()) for g in Group.objects.prefetch_related('members')] normal_members_lists = [list(g.members.all()) for g in Group.objects.all()] self.assertEqual(members_lists, normal_members_lists) def test_prefetch_related_m2m_reverse_works(self): Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) Membership.objects.create(membership_country=self.usa, person=self.jim, group=self.democrat) with self.assertNumQueries(2): groups_lists = [list(p.groups.all()) for p in Person.objects.prefetch_related('groups')] normal_groups_lists = [list(p.groups.all()) for p in Person.objects.all()] self.assertEqual(groups_lists, normal_groups_lists) @translation.override('fi') def test_translations(self): a1 = Article.objects.create(pub_date=datetime.date.today()) at1_fi = ArticleTranslation(article=a1, lang='fi', title='Otsikko', body='Diipadaapa') at1_fi.save() at2_en = ArticleTranslation(article=a1, lang='en', title='Title', body='Lalalalala') at2_en.save() self.assertEqual(Article.objects.get(pk=a1.pk).active_translation, at1_fi) with self.assertNumQueries(1): fetched = Article.objects.select_related('active_translation').get( active_translation__title='Otsikko') self.assertEqual(fetched.active_translation.title, 'Otsikko') a2 = Article.objects.create(pub_date=datetime.date.today()) at2_fi = ArticleTranslation(article=a2, lang='fi', title='Atsikko', body='Diipadaapa', abstract='dipad') at2_fi.save() a3 = Article.objects.create(pub_date=datetime.date.today()) at3_en = ArticleTranslation(article=a3, lang='en', title='A title', body='lalalalala', abstract='lala') at3_en.save() # Test model initialization with active_translation field. a3 = Article(id=a3.id, pub_date=a3.pub_date, active_translation=at3_en) a3.save() self.assertEqual( list(Article.objects.filter(active_translation__abstract=None)), [a1, a3]) self.assertEqual( list(Article.objects.filter(active_translation__abstract=None, active_translation__pk__isnull=False)), [a1]) with translation.override('en'): self.assertEqual( list(Article.objects.filter(active_translation__abstract=None)), [a1, a2]) def test_foreign_key_raises_informative_does_not_exist(self): referrer = ArticleTranslation() with self.assertRaisesMessage(Article.DoesNotExist, 'ArticleTranslation has no article'): referrer.article def test_foreign_key_related_query_name(self): a1 = Article.objects.create(pub_date=datetime.date.today()) ArticleTag.objects.create(article=a1, name="foo") self.assertEqual(Article.objects.filter(tag__name="foo").count(), 1) self.assertEqual(Article.objects.filter(tag__name="bar").count(), 0) msg = ( "Cannot resolve keyword 'tags' into field. Choices are: " "active_translation, active_translation_q, articletranslation, " "id, idea_things, newsarticle, pub_date, tag" ) with self.assertRaisesMessage(FieldError, msg): Article.objects.filter(tags__name="foo") def test_many_to_many_related_query_name(self): a1 = Article.objects.create(pub_date=datetime.date.today()) i1 = ArticleIdea.objects.create(name="idea1") a1.ideas.add(i1) self.assertEqual(Article.objects.filter(idea_things__name="idea1").count(), 1) self.assertEqual(Article.objects.filter(idea_things__name="idea2").count(), 0) msg = ( "Cannot resolve keyword 'ideas' into field. Choices are: " "active_translation, active_translation_q, articletranslation, " "id, idea_things, newsarticle, pub_date, tag" ) with self.assertRaisesMessage(FieldError, msg): Article.objects.filter(ideas__name="idea1") @translation.override('fi') def test_inheritance(self): na = NewsArticle.objects.create(pub_date=datetime.date.today()) ArticleTranslation.objects.create( article=na, lang="fi", title="foo", body="bar") self.assertSequenceEqual( NewsArticle.objects.select_related('active_translation'), [na] ) with self.assertNumQueries(1): self.assertEqual( NewsArticle.objects.select_related( 'active_translation')[0].active_translation.title, "foo") @skipUnlessDBFeature('has_bulk_insert') def test_batch_create_foreign_object(self): objs = [Person(name="abcd_%s" % i, person_country=self.usa) for i in range(0, 5)] Person.objects.bulk_create(objs, 10) def test_isnull_lookup(self): m1 = Membership.objects.create(membership_country=self.usa, person=self.bob, group_id=None) m2 = Membership.objects.create(membership_country=self.usa, person=self.bob, group=self.cia) self.assertSequenceEqual( Membership.objects.filter(group__isnull=True), [m1], ) self.assertSequenceEqual( Membership.objects.filter(group__isnull=False), [m2], ) class TestModelCheckTests(SimpleTestCase): @isolate_apps('foreign_object') def test_check_composite_foreign_object(self): class Parent(models.Model): a = models.PositiveIntegerField() b = models.PositiveIntegerField() class Meta: unique_together = (('a', 'b'),) class Child(models.Model): a = models.PositiveIntegerField() b = models.PositiveIntegerField() value = models.CharField(max_length=255) parent = models.ForeignObject( Parent, on_delete=models.SET_NULL, from_fields=('a', 'b'), to_fields=('a', 'b'), related_name='children', ) self.assertEqual(Child._meta.get_field('parent').check(from_model=Child), []) @isolate_apps('foreign_object') def test_check_subset_composite_foreign_object(self): class Parent(models.Model): a = models.PositiveIntegerField() b = models.PositiveIntegerField() c = models.PositiveIntegerField() class Meta: unique_together = (('a', 'b'),) class Child(models.Model): a = models.PositiveIntegerField() b = models.PositiveIntegerField() c = models.PositiveIntegerField() d = models.CharField(max_length=255) parent = models.ForeignObject( Parent, on_delete=models.SET_NULL, from_fields=('a', 'b', 'c'), to_fields=('a', 'b', 'c'), related_name='children', ) self.assertEqual(Child._meta.get_field('parent').check(from_model=Child), []) class TestExtraJoinFilterQ(TestCase): @translation.override('fi') def test_extra_join_filter_q(self): a = Article.objects.create(pub_date=datetime.datetime.today()) ArticleTranslation.objects.create(article=a, lang='fi', title='title', body='body') qs = Article.objects.all() with self.assertNumQueries(2): self.assertEqual(qs[0].active_translation_q.title, 'title') qs = qs.select_related('active_translation_q') with self.assertNumQueries(1): self.assertEqual(qs[0].active_translation_q.title, 'title') class TestCachedPathInfo(TestCase): def test_equality(self): """ The path_infos and reverse_path_infos attributes are equivalent to calling the get_() with no arguments. """ foreign_object = Membership._meta.get_field('person') self.assertEqual( foreign_object.path_infos, foreign_object.get_path_info(), ) self.assertEqual( foreign_object.reverse_path_infos, foreign_object.get_reverse_path_info(), ) def test_copy_removes_direct_cached_values(self): """ Shallow copying a ForeignObject (or a ForeignObjectRel) removes the object's direct cached PathInfo values. """ foreign_object = Membership._meta.get_field('person') # Trigger storage of cached_property into ForeignObject's __dict__. foreign_object.path_infos foreign_object.reverse_path_infos # The ForeignObjectRel doesn't have reverse_path_infos. foreign_object.remote_field.path_infos self.assertIn('path_infos', foreign_object.__dict__) self.assertIn('reverse_path_infos', foreign_object.__dict__) self.assertIn('path_infos', foreign_object.remote_field.__dict__) # Cached value is removed via __getstate__() on ForeignObjectRel # because no __copy__() method exists, so __reduce_ex__() is used. remote_field_copy = copy.copy(foreign_object.remote_field) self.assertNotIn('path_infos', remote_field_copy.__dict__) # Cached values are removed via __copy__() on ForeignObject for # consistency of behavior. foreign_object_copy = copy.copy(foreign_object) self.assertNotIn('path_infos', foreign_object_copy.__dict__) self.assertNotIn('reverse_path_infos', foreign_object_copy.__dict__) # ForeignObjectRel's remains because it's part of a shallow copy. self.assertIn('path_infos', foreign_object_copy.remote_field.__dict__) def test_deepcopy_removes_cached_values(self): """ Deep copying a ForeignObject removes the object's cached PathInfo values, including those of the related ForeignObjectRel. """ foreign_object = Membership._meta.get_field('person') # Trigger storage of cached_property into ForeignObject's __dict__. foreign_object.path_infos foreign_object.reverse_path_infos # The ForeignObjectRel doesn't have reverse_path_infos. foreign_object.remote_field.path_infos self.assertIn('path_infos', foreign_object.__dict__) self.assertIn('reverse_path_infos', foreign_object.__dict__) self.assertIn('path_infos', foreign_object.remote_field.__dict__) # Cached value is removed via __getstate__() on ForeignObjectRel # because no __deepcopy__() method exists, so __reduce_ex__() is used. remote_field_copy = copy.deepcopy(foreign_object.remote_field) self.assertNotIn('path_infos', remote_field_copy.__dict__) # Field.__deepcopy__() internally uses __copy__() on both the # ForeignObject and ForeignObjectRel, so all cached values are removed. foreign_object_copy = copy.deepcopy(foreign_object) self.assertNotIn('path_infos', foreign_object_copy.__dict__) self.assertNotIn('reverse_path_infos', foreign_object_copy.__dict__) self.assertNotIn('path_infos', foreign_object_copy.remote_field.__dict__) def test_pickling_foreignobjectrel(self): """ Pickling a ForeignObjectRel removes the path_infos attribute. ForeignObjectRel implements __getstate__(), so copy and pickle modules both use that, but ForeignObject implements __reduce__() and __copy__() separately, so doesn't share the same behaviour. """ foreign_object_rel = Membership._meta.get_field('person').remote_field # Trigger storage of cached_property into ForeignObjectRel's __dict__. foreign_object_rel.path_infos self.assertIn('path_infos', foreign_object_rel.__dict__) foreign_object_rel_restored = pickle.loads(pickle.dumps(foreign_object_rel)) self.assertNotIn('path_infos', foreign_object_rel_restored.__dict__) def test_pickling_foreignobject(self): """ Pickling a ForeignObject does not remove the cached PathInfo values. ForeignObject will always keep the path_infos and reverse_path_infos attributes within the same process, because of the way Field.__reduce__() is used for restoring values. """ foreign_object = Membership._meta.get_field('person') # Trigger storage of cached_property into ForeignObjectRel's __dict__ foreign_object.path_infos foreign_object.reverse_path_infos self.assertIn('path_infos', foreign_object.__dict__) self.assertIn('reverse_path_infos', foreign_object.__dict__) foreign_object_restored = pickle.loads(pickle.dumps(foreign_object)) self.assertIn('path_infos', foreign_object_restored.__dict__) self.assertIn('reverse_path_infos', foreign_object_restored.__dict__)