from __future__ import unicode_literals import datetime import pickle from decimal import Decimal from operator import attrgetter from django.contrib.contenttypes.models import ContentType from django.core.exceptions import FieldError from django.db import connection from django.db.models import ( Avg, Count, F, Max, Q, StdDev, Sum, Value, Variance, ) from django.test import TestCase, skipUnlessAnyDBFeature, skipUnlessDBFeature from django.test.utils import Approximate from django.utils import six from .models import ( Alfa, Author, Book, Bravo, Charlie, Clues, Entries, HardbackBook, ItemTag, Publisher, SelfRefFK, Store, WithManualPK, ) class AggregationTests(TestCase): @classmethod def setUpTestData(cls): cls.a1 = Author.objects.create(name='Adrian Holovaty', age=34) cls.a2 = Author.objects.create(name='Jacob Kaplan-Moss', age=35) cls.a3 = Author.objects.create(name='Brad Dayley', age=45) cls.a4 = Author.objects.create(name='James Bennett', age=29) cls.a5 = Author.objects.create(name='Jeffrey Forcier', age=37) cls.a6 = Author.objects.create(name='Paul Bissex', age=29) cls.a7 = Author.objects.create(name='Wesley J. Chun', age=25) cls.a8 = Author.objects.create(name='Peter Norvig', age=57) cls.a9 = Author.objects.create(name='Stuart Russell', age=46) cls.a1.friends.add(cls.a2, cls.a4) cls.a2.friends.add(cls.a1, cls.a7) cls.a4.friends.add(cls.a1) cls.a5.friends.add(cls.a6, cls.a7) cls.a6.friends.add(cls.a5, cls.a7) cls.a7.friends.add(cls.a2, cls.a5, cls.a6) cls.a8.friends.add(cls.a9) cls.a9.friends.add(cls.a8) cls.p1 = Publisher.objects.create(name='Apress', num_awards=3) cls.p2 = Publisher.objects.create(name='Sams', num_awards=1) cls.p3 = Publisher.objects.create(name='Prentice Hall', num_awards=7) cls.p4 = Publisher.objects.create(name='Morgan Kaufmann', num_awards=9) cls.p5 = Publisher.objects.create(name="Jonno's House of Books", num_awards=0) cls.b1 = Book.objects.create( isbn='159059725', name='The Definitive Guide to Django: Web Development Done Right', pages=447, rating=4.5, price=Decimal('30.00'), contact=cls.a1, publisher=cls.p1, pubdate=datetime.date(2007, 12, 6) ) cls.b2 = Book.objects.create( isbn='067232959', name='Sams Teach Yourself Django in 24 Hours', pages=528, rating=3.0, price=Decimal('23.09'), contact=cls.a3, publisher=cls.p2, pubdate=datetime.date(2008, 3, 3) ) cls.b3 = Book.objects.create( isbn='159059996', name='Practical Django Projects', pages=300, rating=4.0, price=Decimal('29.69'), contact=cls.a4, publisher=cls.p1, pubdate=datetime.date(2008, 6, 23) ) cls.b4 = Book.objects.create( isbn='013235613', name='Python Web Development with Django', pages=350, rating=4.0, price=Decimal('29.69'), contact=cls.a5, publisher=cls.p3, pubdate=datetime.date(2008, 11, 3) ) cls.b5 = HardbackBook.objects.create( isbn='013790395', name='Artificial Intelligence: A Modern Approach', pages=1132, rating=4.0, price=Decimal('82.80'), contact=cls.a8, publisher=cls.p3, pubdate=datetime.date(1995, 1, 15), weight=4.5) cls.b6 = HardbackBook.objects.create( isbn='155860191', name='Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp', pages=946, rating=5.0, price=Decimal('75.00'), contact=cls.a8, publisher=cls.p4, pubdate=datetime.date(1991, 10, 15), weight=3.7) cls.b1.authors.add(cls.a1, cls.a2) cls.b2.authors.add(cls.a3) cls.b3.authors.add(cls.a4) cls.b4.authors.add(cls.a5, cls.a6, cls.a7) cls.b5.authors.add(cls.a8, cls.a9) cls.b6.authors.add(cls.a8) s1 = Store.objects.create( name='Amazon.com', original_opening=datetime.datetime(1994, 4, 23, 9, 17, 42), friday_night_closing=datetime.time(23, 59, 59) ) s2 = Store.objects.create( name='Books.com', original_opening=datetime.datetime(2001, 3, 15, 11, 23, 37), friday_night_closing=datetime.time(23, 59, 59) ) s3 = Store.objects.create( name="Mamma and Pappa's Books", original_opening=datetime.datetime(1945, 4, 25, 16, 24, 14), friday_night_closing=datetime.time(21, 30) ) s1.books.add(cls.b1, cls.b2, cls.b3, cls.b4, cls.b5, cls.b6) s2.books.add(cls.b1, cls.b3, cls.b5, cls.b6) s3.books.add(cls.b3, cls.b4, cls.b6) def assertObjectAttrs(self, obj, **kwargs): for attr, value in six.iteritems(kwargs): self.assertEqual(getattr(obj, attr), value) def test_aggregates_in_where_clause(self): """ Regression test for #12822: DatabaseError: aggregates not allowed in WHERE clause The subselect works and returns results equivalent to a query with the IDs listed. Before the corresponding fix for this bug, this test passed in 1.1 and failed in 1.2-beta (trunk). """ qs = Book.objects.values('contact').annotate(Max('id')) qs = qs.order_by('contact').values_list('id__max', flat=True) # don't do anything with the queryset (qs) before including it as a # subquery books = Book.objects.order_by('id') qs1 = books.filter(id__in=qs) qs2 = books.filter(id__in=list(qs)) self.assertEqual(list(qs1), list(qs2)) def test_aggregates_in_where_clause_pre_eval(self): """ Regression test for #12822: DatabaseError: aggregates not allowed in WHERE clause Same as the above test, but evaluates the queryset for the subquery before it's used as a subquery. Before the corresponding fix for this bug, this test failed in both 1.1 and 1.2-beta (trunk). """ qs = Book.objects.values('contact').annotate(Max('id')) qs = qs.order_by('contact').values_list('id__max', flat=True) # force the queryset (qs) for the subquery to be evaluated in its # current state list(qs) books = Book.objects.order_by('id') qs1 = books.filter(id__in=qs) qs2 = books.filter(id__in=list(qs)) self.assertEqual(list(qs1), list(qs2)) @skipUnlessDBFeature('supports_subqueries_in_group_by') def test_annotate_with_extra(self): """ Regression test for #11916: Extra params + aggregation creates incorrect SQL. """ # Oracle doesn't support subqueries in group by clause shortest_book_sql = """ SELECT name FROM aggregation_regress_book b WHERE b.publisher_id = aggregation_regress_publisher.id ORDER BY b.pages LIMIT 1 """ # tests that this query does not raise a DatabaseError due to the full # subselect being (erroneously) added to the GROUP BY parameters qs = Publisher.objects.extra(select={ 'name_of_shortest_book': shortest_book_sql, }).annotate(total_books=Count('book')) # force execution of the query list(qs) def test_aggregate(self): # Ordering requests are ignored self.assertEqual( Author.objects.order_by("name").aggregate(Avg("age")), {"age__avg": Approximate(37.444, places=1)} ) # Implicit ordering is also ignored self.assertEqual( Book.objects.aggregate(Sum("pages")), {"pages__sum": 3703}, ) # Baseline results self.assertEqual( Book.objects.aggregate(Sum('pages'), Avg('pages')), {'pages__sum': 3703, 'pages__avg': Approximate(617.166, places=2)} ) # Empty values query doesn't affect grouping or results self.assertEqual( Book.objects.values().aggregate(Sum('pages'), Avg('pages')), {'pages__sum': 3703, 'pages__avg': Approximate(617.166, places=2)} ) # Aggregate overrides extra selected column self.assertEqual( Book.objects.extra(select={'price_per_page': 'price / pages'}).aggregate(Sum('pages')), {'pages__sum': 3703} ) def test_annotation(self): # Annotations get combined with extra select clauses obj = Book.objects.annotate(mean_auth_age=Avg("authors__age")).extra( select={"manufacture_cost": "price * .5"}).get(pk=self.b2.pk) self.assertObjectAttrs( obj, contact_id=self.a3.id, isbn='067232959', mean_auth_age=45.0, name='Sams Teach Yourself Django in 24 Hours', pages=528, price=Decimal("23.09"), pubdate=datetime.date(2008, 3, 3), publisher_id=self.p2.id, rating=3.0 ) # Different DB backends return different types for the extra select computation self.assertIn(obj.manufacture_cost, (11.545, Decimal('11.545'))) # Order of the annotate/extra in the query doesn't matter obj = Book.objects.extra(select={'manufacture_cost': 'price * .5'}).annotate( mean_auth_age=Avg('authors__age')).get(pk=self.b2.pk) self.assertObjectAttrs( obj, contact_id=self.a3.id, isbn='067232959', mean_auth_age=45.0, name='Sams Teach Yourself Django in 24 Hours', pages=528, price=Decimal("23.09"), pubdate=datetime.date(2008, 3, 3), publisher_id=self.p2.id, rating=3.0 ) # Different DB backends return different types for the extra select computation self.assertIn(obj.manufacture_cost, (11.545, Decimal('11.545'))) # Values queries can be combined with annotate and extra obj = Book.objects.annotate(mean_auth_age=Avg('authors__age')).extra( select={'manufacture_cost': 'price * .5'}).values().get(pk=self.b2.pk) manufacture_cost = obj['manufacture_cost'] self.assertIn(manufacture_cost, (11.545, Decimal('11.545'))) del obj['manufacture_cost'] self.assertEqual(obj, { 'id': self.b2.id, 'contact_id': self.a3.id, 'isbn': '067232959', 'mean_auth_age': 45.0, 'name': 'Sams Teach Yourself Django in 24 Hours', 'pages': 528, 'price': Decimal('23.09'), 'pubdate': datetime.date(2008, 3, 3), 'publisher_id': self.p2.id, 'rating': 3.0, }) # The order of the (empty) values, annotate and extra clauses doesn't # matter obj = Book.objects.values().annotate(mean_auth_age=Avg('authors__age')).extra( select={'manufacture_cost': 'price * .5'}).get(pk=self.b2.pk) manufacture_cost = obj['manufacture_cost'] self.assertIn(manufacture_cost, (11.545, Decimal('11.545'))) del obj['manufacture_cost'] self.assertEqual(obj, { 'id': self.b2.id, 'contact_id': self.a3.id, 'isbn': '067232959', 'mean_auth_age': 45.0, 'name': 'Sams Teach Yourself Django in 24 Hours', 'pages': 528, 'price': Decimal('23.09'), 'pubdate': datetime.date(2008, 3, 3), 'publisher_id': self.p2.id, 'rating': 3.0 }) # If the annotation precedes the values clause, it won't be included # unless it is explicitly named obj = Book.objects.annotate(mean_auth_age=Avg('authors__age')).extra( select={'price_per_page': 'price / pages'}).values('name').get(pk=self.b1.pk) self.assertEqual(obj, { "name": 'The Definitive Guide to Django: Web Development Done Right', }) obj = Book.objects.annotate(mean_auth_age=Avg('authors__age')).extra( select={'price_per_page': 'price / pages'}).values('name', 'mean_auth_age').get(pk=self.b1.pk) self.assertEqual(obj, { 'mean_auth_age': 34.5, 'name': 'The Definitive Guide to Django: Web Development Done Right', }) # If an annotation isn't included in the values, it can still be used # in a filter qs = Book.objects.annotate(n_authors=Count('authors')).values('name').filter(n_authors__gt=2) self.assertSequenceEqual( qs, [ {"name": 'Python Web Development with Django'} ], ) # The annotations are added to values output if values() precedes # annotate() obj = Book.objects.values('name').annotate(mean_auth_age=Avg('authors__age')).extra( select={'price_per_page': 'price / pages'}).get(pk=self.b1.pk) self.assertEqual(obj, { 'mean_auth_age': 34.5, 'name': 'The Definitive Guide to Django: Web Development Done Right', }) # All of the objects are getting counted (allow_nulls) and that values # respects the amount of objects self.assertEqual( len(Author.objects.annotate(Avg('friends__age')).values()), 9 ) # Consecutive calls to annotate accumulate in the query qs = ( Book.objects .values('price') .annotate(oldest=Max('authors__age')) .order_by('oldest', 'price') .annotate(Max('publisher__num_awards')) ) self.assertSequenceEqual( qs, [ {'price': Decimal("30"), 'oldest': 35, 'publisher__num_awards__max': 3}, {'price': Decimal("29.69"), 'oldest': 37, 'publisher__num_awards__max': 7}, {'price': Decimal("23.09"), 'oldest': 45, 'publisher__num_awards__max': 1}, {'price': Decimal("75"), 'oldest': 57, 'publisher__num_awards__max': 9}, {'price': Decimal("82.8"), 'oldest': 57, 'publisher__num_awards__max': 7} ], ) def test_aggregate_annotation(self): # Aggregates can be composed over annotations. # The return type is derived from the composed aggregate vals = ( Book.objects .all() .annotate(num_authors=Count('authors__id')) .aggregate(Max('pages'), Max('price'), Sum('num_authors'), Avg('num_authors')) ) self.assertEqual(vals, { 'num_authors__sum': 10, 'num_authors__avg': Approximate(1.666, places=2), 'pages__max': 1132, 'price__max': Decimal("82.80") }) # Regression for #15624 - Missing SELECT columns when using values, annotate # and aggregate in a single query self.assertEqual( Book.objects.annotate(c=Count('authors')).values('c').aggregate(Max('c')), {'c__max': 3} ) def test_decimal_aggregate_annotation_filter(self): """ Filtering on an aggregate annotation with Decimal values should work. Requires special handling on SQLite (#18247). """ self.assertEqual( len(Author.objects.annotate(sum=Sum('book_contact_set__price')).filter(sum__gt=Decimal(40))), 1 ) self.assertEqual( len(Author.objects.annotate(sum=Sum('book_contact_set__price')).filter(sum__lte=Decimal(40))), 4 ) def test_field_error(self): # Bad field requests in aggregates are caught and reported with self.assertRaises(FieldError): Book.objects.all().aggregate(num_authors=Count('foo')) with self.assertRaises(FieldError): Book.objects.all().annotate(num_authors=Count('foo')) with self.assertRaises(FieldError): Book.objects.all().annotate(num_authors=Count('authors__id')).aggregate(Max('foo')) def test_more(self): # Old-style count aggregations can be mixed with new-style self.assertEqual( Book.objects.annotate(num_authors=Count('authors')).count(), 6 ) # Non-ordinal, non-computed Aggregates over annotations correctly # inherit the annotation's internal type if the annotation is ordinal # or computed vals = Book.objects.annotate(num_authors=Count('authors')).aggregate(Max('num_authors')) self.assertEqual( vals, {'num_authors__max': 3} ) vals = Publisher.objects.annotate(avg_price=Avg('book__price')).aggregate(Max('avg_price')) self.assertEqual( vals, {'avg_price__max': 75.0} ) # Aliases are quoted to protected aliases that might be reserved names vals = Book.objects.aggregate(number=Max('pages'), select=Max('pages')) self.assertEqual( vals, {'number': 1132, 'select': 1132} ) # Regression for #10064: select_related() plays nice with aggregates obj = Book.objects.select_related('publisher').annotate( num_authors=Count('authors')).values().get(isbn='013790395') self.assertEqual(obj, { 'contact_id': self.a8.id, 'id': self.b5.id, 'isbn': '013790395', 'name': 'Artificial Intelligence: A Modern Approach', 'num_authors': 2, 'pages': 1132, 'price': Decimal("82.8"), 'pubdate': datetime.date(1995, 1, 15), 'publisher_id': self.p3.id, 'rating': 4.0, }) # Regression for #10010: exclude on an aggregate field is correctly # negated self.assertEqual( len(Book.objects.annotate(num_authors=Count('authors'))), 6 ) self.assertEqual( len(Book.objects.annotate(num_authors=Count('authors')).filter(num_authors__gt=2)), 1 ) self.assertEqual( len(Book.objects.annotate(num_authors=Count('authors')).exclude(num_authors__gt=2)), 5 ) self.assertEqual( len( Book.objects .annotate(num_authors=Count('authors')) .filter(num_authors__lt=3) .exclude(num_authors__lt=2) ), 2 ) self.assertEqual( len( Book.objects .annotate(num_authors=Count('authors')) .exclude(num_authors__lt=2) .filter(num_authors__lt=3) ), 2 ) def test_aggregate_fexpr(self): # Aggregates can be used with F() expressions # ... where the F() is pushed into the HAVING clause qs = ( Publisher.objects .annotate(num_books=Count('book')) .filter(num_books__lt=F('num_awards') / 2) .order_by('name') .values('name', 'num_books', 'num_awards') ) self.assertSequenceEqual( qs, [ {'num_books': 1, 'name': 'Morgan Kaufmann', 'num_awards': 9}, {'num_books': 2, 'name': 'Prentice Hall', 'num_awards': 7} ], ) qs = ( Publisher.objects .annotate(num_books=Count('book')) .exclude(num_books__lt=F('num_awards') / 2) .order_by('name') .values('name', 'num_books', 'num_awards') ) self.assertSequenceEqual( qs, [ {'num_books': 2, 'name': 'Apress', 'num_awards': 3}, {'num_books': 0, 'name': "Jonno's House of Books", 'num_awards': 0}, {'num_books': 1, 'name': 'Sams', 'num_awards': 1} ], ) # ... and where the F() references an aggregate qs = ( Publisher.objects .annotate(num_books=Count('book')) .filter(num_awards__gt=2 * F('num_books')) .order_by('name') .values('name', 'num_books', 'num_awards') ) self.assertSequenceEqual( qs, [ {'num_books': 1, 'name': 'Morgan Kaufmann', 'num_awards': 9}, {'num_books': 2, 'name': 'Prentice Hall', 'num_awards': 7} ], ) qs = ( Publisher.objects .annotate(num_books=Count('book')) .exclude(num_books__lt=F('num_awards') / 2) .order_by('name') .values('name', 'num_books', 'num_awards') ) self.assertSequenceEqual( qs, [ {'num_books': 2, 'name': 'Apress', 'num_awards': 3}, {'num_books': 0, 'name': "Jonno's House of Books", 'num_awards': 0}, {'num_books': 1, 'name': 'Sams', 'num_awards': 1} ], ) def test_db_col_table(self): # Tests on fields with non-default table and column names. qs = ( Clues.objects .values('EntryID__Entry') .annotate(Appearances=Count('EntryID'), Distinct_Clues=Count('Clue', distinct=True)) ) self.assertQuerysetEqual(qs, []) qs = Entries.objects.annotate(clue_count=Count('clues__ID')) self.assertQuerysetEqual(qs, []) def test_boolean_conversion(self): # Aggregates mixed up ordering of columns for backend's convert_values # method. Refs #21126. e = Entries.objects.create(Entry='foo') c = Clues.objects.create(EntryID=e, Clue='bar') qs = Clues.objects.select_related('EntryID').annotate(Count('ID')) self.assertSequenceEqual(qs, [c]) self.assertEqual(qs[0].EntryID, e) self.assertIs(qs[0].EntryID.Exclude, False) def test_empty(self): # Regression for #10089: Check handling of empty result sets with # aggregates self.assertEqual( Book.objects.filter(id__in=[]).count(), 0 ) vals = ( Book.objects .filter(id__in=[]) .aggregate( num_authors=Count('authors'), avg_authors=Avg('authors'), max_authors=Max('authors'), max_price=Max('price'), max_rating=Max('rating'), ) ) self.assertEqual( vals, {'max_authors': None, 'max_rating': None, 'num_authors': 0, 'avg_authors': None, 'max_price': None} ) qs = ( Publisher.objects .filter(name="Jonno's House of Books") .annotate( num_authors=Count('book__authors'), avg_authors=Avg('book__authors'), max_authors=Max('book__authors'), max_price=Max('book__price'), max_rating=Max('book__rating'), ).values() ) self.assertSequenceEqual( qs, [{ 'max_authors': None, 'name': "Jonno's House of Books", 'num_awards': 0, 'max_price': None, 'num_authors': 0, 'max_rating': None, 'id': self.p5.id, 'avg_authors': None, }], ) def test_more_more(self): # Regression for #10113 - Fields mentioned in order_by() must be # included in the GROUP BY. This only becomes a problem when the # order_by introduces a new join. self.assertQuerysetEqual( Book.objects.annotate(num_authors=Count('authors')).order_by('publisher__name', 'name'), [ "Practical Django Projects", "The Definitive Guide to Django: Web Development Done Right", "Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp", "Artificial Intelligence: A Modern Approach", "Python Web Development with Django", "Sams Teach Yourself Django in 24 Hours", ], lambda b: b.name ) # Regression for #10127 - Empty select_related() works with annotate qs = Book.objects.filter(rating__lt=4.5).select_related().annotate(Avg('authors__age')) self.assertQuerysetEqual( qs, [ ('Artificial Intelligence: A Modern Approach', 51.5, 'Prentice Hall', 'Peter Norvig'), ('Practical Django Projects', 29.0, 'Apress', 'James Bennett'), ( 'Python Web Development with Django', Approximate(30.333, places=2), 'Prentice Hall', 'Jeffrey Forcier', ), ('Sams Teach Yourself Django in 24 Hours', 45.0, 'Sams', 'Brad Dayley') ], lambda b: (b.name, b.authors__age__avg, b.publisher.name, b.contact.name) ) # Regression for #10132 - If the values() clause only mentioned extra # (select=) columns, those columns are used for grouping qs = Book.objects.extra(select={'pub': 'publisher_id'}).values('pub').annotate(Count('id')).order_by('pub') self.assertSequenceEqual( qs, [ {'pub': self.b1.id, 'id__count': 2}, {'pub': self.b2.id, 'id__count': 1}, {'pub': self.b3.id, 'id__count': 2}, {'pub': self.b4.id, 'id__count': 1} ], ) qs = ( Book.objects .extra(select={'pub': 'publisher_id', 'foo': 'pages'}) .values('pub') .annotate(Count('id')) .order_by('pub') ) self.assertSequenceEqual( qs, [ {'pub': self.p1.id, 'id__count': 2}, {'pub': self.p2.id, 'id__count': 1}, {'pub': self.p3.id, 'id__count': 2}, {'pub': self.p4.id, 'id__count': 1} ], ) # Regression for #10182 - Queries with aggregate calls are correctly # realiased when used in a subquery ids = ( Book.objects .filter(pages__gt=100) .annotate(n_authors=Count('authors')) .filter(n_authors__gt=2) .order_by('n_authors') ) self.assertQuerysetEqual( Book.objects.filter(id__in=ids), [ "Python Web Development with Django", ], lambda b: b.name ) # Regression for #15709 - Ensure each group_by field only exists once # per query qstr = str(Book.objects.values('publisher').annotate(max_pages=Max('pages')).order_by().query) # There is just one GROUP BY clause (zero commas means at most one clause). self.assertEqual(qstr[qstr.index('GROUP BY'):].count(', '), 0) def test_duplicate_alias(self): # Regression for #11256 - duplicating a default alias raises ValueError. with self.assertRaises(ValueError): Book.objects.all().annotate(Avg('authors__age'), authors__age__avg=Avg('authors__age')) def test_field_name_conflict(self): # Regression for #11256 - providing an aggregate name # that conflicts with a field name on the model raises ValueError with self.assertRaises(ValueError): Author.objects.annotate(age=Avg('friends__age')) def test_m2m_name_conflict(self): # Regression for #11256 - providing an aggregate name # that conflicts with an m2m name on the model raises ValueError with self.assertRaises(ValueError): Author.objects.annotate(friends=Count('friends')) def test_values_queryset_non_conflict(self): # Regression for #14707 -- If you're using a values query set, some potential conflicts are avoided. # age is a field on Author, so it shouldn't be allowed as an aggregate. # But age isn't included in values(), so it is. results = Author.objects.values('name').annotate(age=Count('book_contact_set')).order_by('name') self.assertEqual(len(results), 9) self.assertEqual(results[0]['name'], 'Adrian Holovaty') self.assertEqual(results[0]['age'], 1) # Same problem, but aggregating over m2m fields results = Author.objects.values('name').annotate(age=Avg('friends__age')).order_by('name') self.assertEqual(len(results), 9) self.assertEqual(results[0]['name'], 'Adrian Holovaty') self.assertEqual(results[0]['age'], 32.0) # Same problem, but colliding with an m2m field results = Author.objects.values('name').annotate(friends=Count('friends')).order_by('name') self.assertEqual(len(results), 9) self.assertEqual(results[0]['name'], 'Adrian Holovaty') self.assertEqual(results[0]['friends'], 2) def test_reverse_relation_name_conflict(self): # Regression for #11256 - providing an aggregate name # that conflicts with a reverse-related name on the model raises ValueError with self.assertRaises(ValueError): Author.objects.annotate(book_contact_set=Avg('friends__age')) def test_pickle(self): # Regression for #10197 -- Queries with aggregates can be pickled. # First check that pickling is possible at all. No crash = success qs = Book.objects.annotate(num_authors=Count('authors')) pickle.dumps(qs) # Then check that the round trip works. query = qs.query.get_compiler(qs.db).as_sql()[0] qs2 = pickle.loads(pickle.dumps(qs)) self.assertEqual( qs2.query.get_compiler(qs2.db).as_sql()[0], query, ) def test_more_more_more(self): # Regression for #10199 - Aggregate calls clone the original query so # the original query can still be used books = Book.objects.all() books.aggregate(Avg("authors__age")) self.assertQuerysetEqual( books.all(), [ 'Artificial Intelligence: A Modern Approach', 'Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp', 'Practical Django Projects', 'Python Web Development with Django', 'Sams Teach Yourself Django in 24 Hours', 'The Definitive Guide to Django: Web Development Done Right' ], lambda b: b.name ) # Regression for #10248 - Annotations work with dates() qs = Book.objects.annotate(num_authors=Count('authors')).filter(num_authors=2).dates('pubdate', 'day') self.assertSequenceEqual( qs, [ datetime.date(1995, 1, 15), datetime.date(2007, 12, 6), ], ) # Regression for #10290 - extra selects with parameters can be used for # grouping. qs = ( Book.objects .annotate(mean_auth_age=Avg('authors__age')) .extra(select={'sheets': '(pages + %s) / %s'}, select_params=[1, 2]) .order_by('sheets') .values('sheets') ) self.assertQuerysetEqual( qs, [ 150, 175, 224, 264, 473, 566 ], lambda b: int(b["sheets"]) ) # Regression for 10425 - annotations don't get in the way of a count() # clause self.assertEqual( Book.objects.values('publisher').annotate(Count('publisher')).count(), 4 ) self.assertEqual( Book.objects.annotate(Count('publisher')).values('publisher').count(), 6 ) # Note: intentionally no order_by(), that case needs tests, too. publishers = Publisher.objects.filter(id__in=[1, 2]) self.assertEqual( sorted(p.name for p in publishers), [ "Apress", "Sams" ] ) publishers = publishers.annotate(n_books=Count("book")) sorted_publishers = sorted(publishers, key=lambda x: x.name) self.assertEqual( sorted_publishers[0].n_books, 2 ) self.assertEqual( sorted_publishers[1].n_books, 1 ) self.assertEqual( sorted(p.name for p in publishers), [ "Apress", "Sams" ] ) books = Book.objects.filter(publisher__in=publishers) self.assertQuerysetEqual( books, [ "Practical Django Projects", "Sams Teach Yourself Django in 24 Hours", "The Definitive Guide to Django: Web Development Done Right", ], lambda b: b.name ) self.assertEqual( sorted(p.name for p in publishers), [ "Apress", "Sams" ] ) # Regression for 10666 - inherited fields work with annotations and # aggregations self.assertEqual( HardbackBook.objects.aggregate(n_pages=Sum('book_ptr__pages')), {'n_pages': 2078} ) self.assertEqual( HardbackBook.objects.aggregate(n_pages=Sum('pages')), {'n_pages': 2078}, ) qs = HardbackBook.objects.annotate(n_authors=Count('book_ptr__authors')).values('name', 'n_authors') self.assertSequenceEqual( qs, [ {'n_authors': 2, 'name': 'Artificial Intelligence: A Modern Approach'}, { 'n_authors': 1, 'name': 'Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp' } ], ) qs = HardbackBook.objects.annotate(n_authors=Count('authors')).values('name', 'n_authors') self.assertSequenceEqual( qs, [ {'n_authors': 2, 'name': 'Artificial Intelligence: A Modern Approach'}, { 'n_authors': 1, 'name': 'Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp' } ], ) # Regression for #10766 - Shouldn't be able to reference an aggregate # fields in an aggregate() call. with self.assertRaises(FieldError): Book.objects.annotate(mean_age=Avg('authors__age')).annotate(Avg('mean_age')) def test_empty_filter_count(self): self.assertEqual( Author.objects.filter(id__in=[]).annotate(Count("friends")).count(), 0 ) def test_empty_filter_aggregate(self): self.assertEqual( Author.objects.filter(id__in=[]).annotate(Count("friends")).aggregate(Count("pk")), {"pk__count": None} ) def test_none_call_before_aggregate(self): # Regression for #11789 self.assertEqual( Author.objects.none().aggregate(Avg('age')), {'age__avg': None} ) def test_annotate_and_join(self): self.assertEqual( Author.objects.annotate(c=Count("friends__name")).exclude(friends__name="Joe").count(), Author.objects.count() ) def test_f_expression_annotation(self): # Books with less than 200 pages per author. qs = Book.objects.values("name").annotate( n_authors=Count("authors") ).filter( pages__lt=F("n_authors") * 200 ).values_list("pk") self.assertQuerysetEqual( Book.objects.filter(pk__in=qs), [ "Python Web Development with Django" ], attrgetter("name") ) def test_values_annotate_values(self): qs = Book.objects.values("name").annotate( n_authors=Count("authors") ).values_list("pk", flat=True) self.assertEqual(list(qs), list(Book.objects.values_list("pk", flat=True))) def test_having_group_by(self): # When a field occurs on the LHS of a HAVING clause that it # appears correctly in the GROUP BY clause qs = Book.objects.values_list("name").annotate( n_authors=Count("authors") ).filter( pages__gt=F("n_authors") ).values_list("name", flat=True) # Results should be the same, all Books have more pages than authors self.assertEqual( list(qs), list(Book.objects.values_list("name", flat=True)) ) def test_values_list_annotation_args_ordering(self): """ Annotate *args ordering should be preserved in values_list results. **kwargs comes after *args. Regression test for #23659. """ books = Book.objects.values_list("publisher__name").annotate( Count("id"), Avg("price"), Avg("authors__age"), avg_pgs=Avg("pages") ).order_by("-publisher__name") self.assertEqual(books[0], ('Sams', 1, 23.09, 45.0, 528.0)) def test_annotation_disjunction(self): qs = Book.objects.annotate(n_authors=Count("authors")).filter( Q(n_authors=2) | Q(name="Python Web Development with Django") ) self.assertQuerysetEqual( qs, [ "Artificial Intelligence: A Modern Approach", "Python Web Development with Django", "The Definitive Guide to Django: Web Development Done Right", ], attrgetter("name") ) qs = ( Book.objects .annotate(n_authors=Count("authors")) .filter( Q(name="The Definitive Guide to Django: Web Development Done Right") | (Q(name="Artificial Intelligence: A Modern Approach") & Q(n_authors=3)) ) ) self.assertQuerysetEqual( qs, [ "The Definitive Guide to Django: Web Development Done Right", ], attrgetter("name") ) qs = Publisher.objects.annotate( rating_sum=Sum("book__rating"), book_count=Count("book") ).filter( Q(rating_sum__gt=5.5) | Q(rating_sum__isnull=True) ).order_by('pk') self.assertQuerysetEqual( qs, [ "Apress", "Prentice Hall", "Jonno's House of Books", ], attrgetter("name") ) qs = Publisher.objects.annotate( rating_sum=Sum("book__rating"), book_count=Count("book") ).filter( Q(rating_sum__gt=F("book_count")) | Q(rating_sum=None) ).order_by("num_awards") self.assertQuerysetEqual( qs, [ "Jonno's House of Books", "Sams", "Apress", "Prentice Hall", "Morgan Kaufmann" ], attrgetter("name") ) def test_quoting_aggregate_order_by(self): qs = Book.objects.filter( name="Python Web Development with Django" ).annotate( authorCount=Count("authors") ).order_by("authorCount") self.assertQuerysetEqual( qs, [ ("Python Web Development with Django", 3), ], lambda b: (b.name, b.authorCount) ) @skipUnlessDBFeature('supports_stddev') def test_stddev(self): self.assertEqual( Book.objects.aggregate(StdDev('pages')), {'pages__stddev': Approximate(311.46, 1)} ) self.assertEqual( Book.objects.aggregate(StdDev('rating')), {'rating__stddev': Approximate(0.60, 1)} ) self.assertEqual( Book.objects.aggregate(StdDev('price')), {'price__stddev': Approximate(24.16, 2)} ) self.assertEqual( Book.objects.aggregate(StdDev('pages', sample=True)), {'pages__stddev': Approximate(341.19, 2)} ) self.assertEqual( Book.objects.aggregate(StdDev('rating', sample=True)), {'rating__stddev': Approximate(0.66, 2)} ) self.assertEqual( Book.objects.aggregate(StdDev('price', sample=True)), {'price__stddev': Approximate(26.46, 1)} ) self.assertEqual( Book.objects.aggregate(Variance('pages')), {'pages__variance': Approximate(97010.80, 1)} ) self.assertEqual( Book.objects.aggregate(Variance('rating')), {'rating__variance': Approximate(0.36, 1)} ) self.assertEqual( Book.objects.aggregate(Variance('price')), {'price__variance': Approximate(583.77, 1)} ) self.assertEqual( Book.objects.aggregate(Variance('pages', sample=True)), {'pages__variance': Approximate(116412.96, 1)} ) self.assertEqual( Book.objects.aggregate(Variance('rating', sample=True)), {'rating__variance': Approximate(0.44, 2)} ) self.assertEqual( Book.objects.aggregate(Variance('price', sample=True)), {'price__variance': Approximate(700.53, 2)} ) def test_filtering_by_annotation_name(self): # Regression test for #14476 # The name of the explicitly provided annotation name in this case # poses no problem qs = Author.objects.annotate(book_cnt=Count('book')).filter(book_cnt=2).order_by('name') self.assertQuerysetEqual( qs, ['Peter Norvig'], lambda b: b.name ) # Neither in this case qs = Author.objects.annotate(book_count=Count('book')).filter(book_count=2).order_by('name') self.assertQuerysetEqual( qs, ['Peter Norvig'], lambda b: b.name ) # This case used to fail because the ORM couldn't resolve the # automatically generated annotation name `book__count` qs = Author.objects.annotate(Count('book')).filter(book__count=2).order_by('name') self.assertQuerysetEqual( qs, ['Peter Norvig'], lambda b: b.name ) # Referencing the auto-generated name in an aggregate() also works. self.assertEqual( Author.objects.annotate(Count('book')).aggregate(Max('book__count')), {'book__count__max': 2} ) def test_annotate_joins(self): """ The base table's join isn't promoted to LOUTER. This could cause the query generation to fail if there is an exclude() for fk-field in the query, too. Refs #19087. """ qs = Book.objects.annotate(n=Count('pk')) self.assertIs(qs.query.alias_map['aggregation_regress_book'].join_type, None) # The query executes without problems. self.assertEqual(len(qs.exclude(publisher=-1)), 6) @skipUnlessAnyDBFeature('allows_group_by_pk', 'allows_group_by_selected_pks') def test_aggregate_duplicate_columns(self): # Regression test for #17144 results = Author.objects.annotate(num_contacts=Count('book_contact_set')) # There should only be one GROUP BY clause, for the `id` column. # `name` and `age` should not be grouped on. _, _, group_by = results.query.get_compiler(using='default').pre_sql_setup() self.assertEqual(len(group_by), 1) self.assertIn('id', group_by[0][0]) self.assertNotIn('name', group_by[0][0]) self.assertNotIn('age', group_by[0][0]) self.assertEqual( [(a.name, a.num_contacts) for a in results.order_by('name')], [ ('Adrian Holovaty', 1), ('Brad Dayley', 1), ('Jacob Kaplan-Moss', 0), ('James Bennett', 1), ('Jeffrey Forcier', 1), ('Paul Bissex', 0), ('Peter Norvig', 2), ('Stuart Russell', 0), ('Wesley J. Chun', 0), ] ) @skipUnlessAnyDBFeature('allows_group_by_pk', 'allows_group_by_selected_pks') def test_aggregate_duplicate_columns_only(self): # Works with only() too. results = Author.objects.only('id', 'name').annotate(num_contacts=Count('book_contact_set')) _, _, grouping = results.query.get_compiler(using='default').pre_sql_setup() self.assertEqual(len(grouping), 1) self.assertIn('id', grouping[0][0]) self.assertNotIn('name', grouping[0][0]) self.assertNotIn('age', grouping[0][0]) self.assertEqual( [(a.name, a.num_contacts) for a in results.order_by('name')], [ ('Adrian Holovaty', 1), ('Brad Dayley', 1), ('Jacob Kaplan-Moss', 0), ('James Bennett', 1), ('Jeffrey Forcier', 1), ('Paul Bissex', 0), ('Peter Norvig', 2), ('Stuart Russell', 0), ('Wesley J. Chun', 0), ] ) @skipUnlessAnyDBFeature('allows_group_by_pk', 'allows_group_by_selected_pks') def test_aggregate_duplicate_columns_select_related(self): # And select_related() results = Book.objects.select_related('contact').annotate( num_authors=Count('authors')) _, _, grouping = results.query.get_compiler(using='default').pre_sql_setup() # In the case of `group_by_selected_pks` we also group by contact.id because of the select_related. self.assertEqual(len(grouping), 1 if connection.features.allows_group_by_pk else 2) self.assertIn('id', grouping[0][0]) self.assertNotIn('name', grouping[0][0]) self.assertNotIn('contact', grouping[0][0]) self.assertEqual( [(b.name, b.num_authors) for b in results.order_by('name')], [ ('Artificial Intelligence: A Modern Approach', 2), ('Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp', 1), ('Practical Django Projects', 1), ('Python Web Development with Django', 3), ('Sams Teach Yourself Django in 24 Hours', 1), ('The Definitive Guide to Django: Web Development Done Right', 2) ] ) def test_reverse_join_trimming(self): qs = Author.objects.annotate(Count('book_contact_set__contact')) self.assertIn(' JOIN ', str(qs.query)) def test_aggregation_with_generic_reverse_relation(self): """ Regression test for #10870: Aggregates with joins ignore extra filters provided by setup_joins tests aggregations with generic reverse relations """ django_book = Book.objects.get(name='Practical Django Projects') ItemTag.objects.create( object_id=django_book.id, tag='intermediate', content_type=ContentType.objects.get_for_model(django_book), ) ItemTag.objects.create( object_id=django_book.id, tag='django', content_type=ContentType.objects.get_for_model(django_book), ) # Assign a tag to model with same PK as the book above. If the JOIN # used in aggregation doesn't have content type as part of the # condition the annotation will also count the 'hi mom' tag for b. wmpk = WithManualPK.objects.create(id=django_book.pk) ItemTag.objects.create( object_id=wmpk.id, tag='hi mom', content_type=ContentType.objects.get_for_model(wmpk), ) ai_book = Book.objects.get(name__startswith='Paradigms of Artificial Intelligence') ItemTag.objects.create( object_id=ai_book.id, tag='intermediate', content_type=ContentType.objects.get_for_model(ai_book), ) self.assertEqual(Book.objects.aggregate(Count('tags')), {'tags__count': 3}) results = Book.objects.annotate(Count('tags')).order_by('-tags__count', 'name') self.assertEqual( [(b.name, b.tags__count) for b in results], [ ('Practical Django Projects', 2), ('Paradigms of Artificial Intelligence Programming: Case Studies in Common Lisp', 1), ('Artificial Intelligence: A Modern Approach', 0), ('Python Web Development with Django', 0), ('Sams Teach Yourself Django in 24 Hours', 0), ('The Definitive Guide to Django: Web Development Done Right', 0) ] ) def test_negated_aggregation(self): expected_results = Author.objects.exclude( pk__in=Author.objects.annotate(book_cnt=Count('book')).filter(book_cnt=2) ).order_by('name') expected_results = [a.name for a in expected_results] qs = Author.objects.annotate(book_cnt=Count('book')).exclude( Q(book_cnt=2), Q(book_cnt=2)).order_by('name') self.assertQuerysetEqual( qs, expected_results, lambda b: b.name ) expected_results = Author.objects.exclude( pk__in=Author.objects.annotate(book_cnt=Count('book')).filter(book_cnt=2) ).order_by('name') expected_results = [a.name for a in expected_results] qs = Author.objects.annotate(book_cnt=Count('book')).exclude(Q(book_cnt=2) | Q(book_cnt=2)).order_by('name') self.assertQuerysetEqual( qs, expected_results, lambda b: b.name ) def test_name_filters(self): qs = Author.objects.annotate(Count('book')).filter( Q(book__count__exact=2) | Q(name='Adrian Holovaty') ).order_by('name') self.assertQuerysetEqual( qs, ['Adrian Holovaty', 'Peter Norvig'], lambda b: b.name ) def test_name_expressions(self): # Aggregates are spotted correctly from F objects. # Note that Adrian's age is 34 in the fixtures, and he has one book # so both conditions match one author. qs = Author.objects.annotate(Count('book')).filter( Q(name='Peter Norvig') | Q(age=F('book__count') + 33) ).order_by('name') self.assertQuerysetEqual( qs, ['Adrian Holovaty', 'Peter Norvig'], lambda b: b.name ) def test_ticket_11293(self): q1 = Q(price__gt=50) q2 = Q(authors__count__gt=1) query = Book.objects.annotate(Count('authors')).filter( q1 | q2).order_by('pk') self.assertQuerysetEqual( query, [1, 4, 5, 6], lambda b: b.pk) def test_ticket_11293_q_immutable(self): """ Splitting a q object to parts for where/having doesn't alter the original q-object. """ q1 = Q(isbn='') q2 = Q(authors__count__gt=1) query = Book.objects.annotate(Count('authors')) query.filter(q1 | q2) self.assertEqual(len(q2.children), 1) def test_fobj_group_by(self): """ An F() object referring to related column works correctly in group by. """ qs = Book.objects.annotate( account=Count('authors') ).filter( account=F('publisher__num_awards') ) self.assertQuerysetEqual( qs, ['Sams Teach Yourself Django in 24 Hours'], lambda b: b.name) def test_annotate_reserved_word(self): """ Regression #18333 - Ensure annotated column name is properly quoted. """ vals = Book.objects.annotate(select=Count('authors__id')).aggregate(Sum('select'), Avg('select')) self.assertEqual(vals, { 'select__sum': 10, 'select__avg': Approximate(1.666, places=2), }) def test_annotate_on_relation(self): book = Book.objects.annotate(avg_price=Avg('price'), publisher_name=F('publisher__name')).get(pk=self.b1.pk) self.assertEqual(book.avg_price, 30.00) self.assertEqual(book.publisher_name, "Apress") def test_aggregate_on_relation(self): # A query with an existing annotation aggregation on a relation should # succeed. qs = Book.objects.annotate(avg_price=Avg('price')).aggregate( publisher_awards=Sum('publisher__num_awards') ) self.assertEqual(qs['publisher_awards'], 30) def test_annotate_distinct_aggregate(self): # There are three books with rating of 4.0 and two of the books have # the same price. Hence, the distinct removes one rating of 4.0 # from the results. vals1 = Book.objects.values('rating', 'price').distinct().aggregate(result=Sum('rating')) vals2 = Book.objects.aggregate(result=Sum('rating') - Value(4.0)) self.assertEqual(vals1, vals2) class JoinPromotionTests(TestCase): def test_ticket_21150(self): b = Bravo.objects.create() c = Charlie.objects.create(bravo=b) qs = Charlie.objects.select_related('alfa').annotate(Count('bravo__charlie')) self.assertSequenceEqual(qs, [c]) self.assertIs(qs[0].alfa, None) a = Alfa.objects.create() c.alfa = a c.save() # Force re-evaluation qs = qs.all() self.assertSequenceEqual(qs, [c]) self.assertEqual(qs[0].alfa, a) def test_existing_join_not_promoted(self): # No promotion for existing joins qs = Charlie.objects.filter(alfa__name__isnull=False).annotate(Count('alfa__name')) self.assertIn(' INNER JOIN ', str(qs.query)) # Also, the existing join is unpromoted when doing filtering for already # promoted join. qs = Charlie.objects.annotate(Count('alfa__name')).filter(alfa__name__isnull=False) self.assertIn(' INNER JOIN ', str(qs.query)) # But, as the join is nullable first use by annotate will be LOUTER qs = Charlie.objects.annotate(Count('alfa__name')) self.assertIn(' LEFT OUTER JOIN ', str(qs.query)) def test_non_nullable_fk_not_promoted(self): qs = Book.objects.annotate(Count('contact__name')) self.assertIn(' INNER JOIN ', str(qs.query)) class SelfReferentialFKTests(TestCase): def test_ticket_24748(self): t1 = SelfRefFK.objects.create(name='t1') SelfRefFK.objects.create(name='t2', parent=t1) SelfRefFK.objects.create(name='t3', parent=t1) self.assertQuerysetEqual( SelfRefFK.objects.annotate(num_children=Count('children')).order_by('name'), [('t1', 2), ('t2', 0), ('t3', 0)], lambda x: (x.name, x.num_children) )