Refs #26430 -- Added tests for PostgreSQL-specific aggregates on EmptyQuerySets and used subTest().

This commit is contained in:
Mariusz Felisiak 2021-06-29 20:23:59 +02:00 committed by GitHub
parent 503ee41497
commit 5371ad1d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 99 deletions

View File

@ -41,6 +41,34 @@ class TestGeneralAggregate(PostgreSQLTestCase):
), ),
]) ])
def test_empty_result_set(self):
AggregateTestModel.objects.all().delete()
tests = [
(ArrayAgg('char_field'), []),
(ArrayAgg('integer_field'), []),
(ArrayAgg('boolean_field'), []),
(BitAnd('integer_field'), None),
(BitOr('integer_field'), None),
(BoolAnd('boolean_field'), None),
(BoolOr('boolean_field'), None),
(JSONBAgg('integer_field'), []),
(StringAgg('char_field', delimiter=';'), ''),
]
for aggregation, expected_result in tests:
with self.subTest(aggregation=aggregation):
# Empty result with non-execution optimization.
with self.assertNumQueries(0):
values = AggregateTestModel.objects.none().aggregate(
aggregation=aggregation,
)
self.assertEqual(values, {'aggregation': expected_result})
# Empty result when query must be executed.
with self.assertNumQueries(1):
values = AggregateTestModel.objects.aggregate(
aggregation=aggregation,
)
self.assertEqual(values, {'aggregation': expected_result})
def test_array_agg_charfield(self): def test_array_agg_charfield(self):
values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('char_field')) values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('char_field'))
self.assertEqual(values, {'arrayagg': ['Foo1', 'Foo2', 'Foo4', 'Foo3']}) self.assertEqual(values, {'arrayagg': ['Foo1', 'Foo2', 'Foo4', 'Foo3']})
@ -120,15 +148,6 @@ class TestGeneralAggregate(PostgreSQLTestCase):
) )
self.assertEqual(values, {'arrayagg': [1, 2]}) self.assertEqual(values, {'arrayagg': [1, 2]})
def test_array_agg_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('char_field'))
self.assertEqual(values, {'arrayagg': []})
values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('integer_field'))
self.assertEqual(values, {'arrayagg': []})
values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('boolean_field'))
self.assertEqual(values, {'arrayagg': []})
def test_array_agg_lookups(self): def test_array_agg_lookups(self):
aggr1 = AggregateTestModel.objects.create() aggr1 = AggregateTestModel.objects.create()
aggr2 = AggregateTestModel.objects.create() aggr2 = AggregateTestModel.objects.create()
@ -156,11 +175,6 @@ class TestGeneralAggregate(PostgreSQLTestCase):
integer_field=0).aggregate(bitand=BitAnd('integer_field')) integer_field=0).aggregate(bitand=BitAnd('integer_field'))
self.assertEqual(values, {'bitand': 0}) self.assertEqual(values, {'bitand': 0})
def test_bit_and_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(bitand=BitAnd('integer_field'))
self.assertEqual(values, {'bitand': None})
def test_bit_or_general(self): def test_bit_or_general(self):
values = AggregateTestModel.objects.filter( values = AggregateTestModel.objects.filter(
integer_field__in=[0, 1]).aggregate(bitor=BitOr('integer_field')) integer_field__in=[0, 1]).aggregate(bitor=BitOr('integer_field'))
@ -176,20 +190,10 @@ class TestGeneralAggregate(PostgreSQLTestCase):
integer_field=0).aggregate(bitor=BitOr('integer_field')) integer_field=0).aggregate(bitor=BitOr('integer_field'))
self.assertEqual(values, {'bitor': 0}) self.assertEqual(values, {'bitor': 0})
def test_bit_or_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(bitor=BitOr('integer_field'))
self.assertEqual(values, {'bitor': None})
def test_bool_and_general(self): def test_bool_and_general(self):
values = AggregateTestModel.objects.aggregate(booland=BoolAnd('boolean_field')) values = AggregateTestModel.objects.aggregate(booland=BoolAnd('boolean_field'))
self.assertEqual(values, {'booland': False}) self.assertEqual(values, {'booland': False})
def test_bool_and_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(booland=BoolAnd('boolean_field'))
self.assertEqual(values, {'booland': None})
def test_bool_and_q_object(self): def test_bool_and_q_object(self):
values = AggregateTestModel.objects.aggregate( values = AggregateTestModel.objects.aggregate(
booland=BoolAnd(Q(integer_field__gt=2)), booland=BoolAnd(Q(integer_field__gt=2)),
@ -200,11 +204,6 @@ class TestGeneralAggregate(PostgreSQLTestCase):
values = AggregateTestModel.objects.aggregate(boolor=BoolOr('boolean_field')) values = AggregateTestModel.objects.aggregate(boolor=BoolOr('boolean_field'))
self.assertEqual(values, {'boolor': True}) self.assertEqual(values, {'boolor': True})
def test_bool_or_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(boolor=BoolOr('boolean_field'))
self.assertEqual(values, {'boolor': None})
def test_bool_or_q_object(self): def test_bool_or_q_object(self):
values = AggregateTestModel.objects.aggregate( values = AggregateTestModel.objects.aggregate(
boolor=BoolOr(Q(integer_field__gt=2)), boolor=BoolOr(Q(integer_field__gt=2)),
@ -261,11 +260,6 @@ class TestGeneralAggregate(PostgreSQLTestCase):
) )
self.assertEqual(values, {'stringagg': 'Foo1;Foo3'}) self.assertEqual(values, {'stringagg': 'Foo1;Foo3'})
def test_string_agg_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(stringagg=StringAgg('char_field', delimiter=';'))
self.assertEqual(values, {'stringagg': ''})
def test_orderable_agg_alternative_fields(self): def test_orderable_agg_alternative_fields(self):
values = AggregateTestModel.objects.aggregate( values = AggregateTestModel.objects.aggregate(
arrayagg=ArrayAgg('integer_field', ordering=F('char_field').asc()) arrayagg=ArrayAgg('integer_field', ordering=F('char_field').asc())
@ -276,11 +270,6 @@ class TestGeneralAggregate(PostgreSQLTestCase):
values = AggregateTestModel.objects.aggregate(jsonbagg=JSONBAgg('char_field')) values = AggregateTestModel.objects.aggregate(jsonbagg=JSONBAgg('char_field'))
self.assertEqual(values, {'jsonbagg': ['Foo1', 'Foo2', 'Foo4', 'Foo3']}) self.assertEqual(values, {'jsonbagg': ['Foo1', 'Foo2', 'Foo4', 'Foo3']})
def test_jsonb_agg_empty_result(self):
AggregateTestModel.objects.all().delete()
values = AggregateTestModel.objects.aggregate(jsonbagg=JSONBAgg('integer_field'))
self.assertEqual(values, {'jsonbagg': []})
def test_jsonb_agg_charfield_ordering(self): def test_jsonb_agg_charfield_ordering(self):
ordering_test_cases = ( ordering_test_cases = (
(F('char_field').desc(), ['Foo4', 'Foo3', 'Foo2', 'Foo1']), (F('char_field').desc(), ['Foo4', 'Foo3', 'Foo2', 'Foo1']),
@ -478,114 +467,85 @@ class TestStatisticsAggregate(PostgreSQLTestCase):
# Test aggregates # Test aggregates
def test_empty_result_set(self):
StatTestModel.objects.all().delete()
tests = [
(Corr(y='int2', x='int1'), None),
(CovarPop(y='int2', x='int1'), None),
(CovarPop(y='int2', x='int1', sample=True), None),
(RegrAvgX(y='int2', x='int1'), None),
(RegrAvgY(y='int2', x='int1'), None),
(RegrCount(y='int2', x='int1'), 0),
(RegrIntercept(y='int2', x='int1'), None),
(RegrR2(y='int2', x='int1'), None),
(RegrSlope(y='int2', x='int1'), None),
(RegrSXX(y='int2', x='int1'), None),
(RegrSXY(y='int2', x='int1'), None),
(RegrSYY(y='int2', x='int1'), None),
]
for aggregation, expected_result in tests:
with self.subTest(aggregation=aggregation):
# Empty result with non-execution optimization.
with self.assertNumQueries(0):
values = StatTestModel.objects.none().aggregate(
aggregation=aggregation,
)
self.assertEqual(values, {'aggregation': expected_result})
# Empty result when query must be executed.
with self.assertNumQueries(1):
values = StatTestModel.objects.aggregate(
aggregation=aggregation,
)
self.assertEqual(values, {'aggregation': expected_result})
def test_corr_general(self): def test_corr_general(self):
values = StatTestModel.objects.aggregate(corr=Corr(y='int2', x='int1')) values = StatTestModel.objects.aggregate(corr=Corr(y='int2', x='int1'))
self.assertEqual(values, {'corr': -1.0}) self.assertEqual(values, {'corr': -1.0})
def test_corr_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(corr=Corr(y='int2', x='int1'))
self.assertEqual(values, {'corr': None})
def test_covar_pop_general(self): def test_covar_pop_general(self):
values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1')) values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1'))
self.assertEqual(values, {'covarpop': Approximate(-0.66, places=1)}) self.assertEqual(values, {'covarpop': Approximate(-0.66, places=1)})
def test_covar_pop_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1'))
self.assertEqual(values, {'covarpop': None})
def test_covar_pop_sample(self): def test_covar_pop_sample(self):
values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1', sample=True)) values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1', sample=True))
self.assertEqual(values, {'covarpop': -1.0}) self.assertEqual(values, {'covarpop': -1.0})
def test_covar_pop_sample_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1', sample=True))
self.assertEqual(values, {'covarpop': None})
def test_regr_avgx_general(self): def test_regr_avgx_general(self):
values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y='int2', x='int1'))
self.assertEqual(values, {'regravgx': 2.0}) self.assertEqual(values, {'regravgx': 2.0})
def test_regr_avgx_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y='int2', x='int1'))
self.assertEqual(values, {'regravgx': None})
def test_regr_avgy_general(self): def test_regr_avgy_general(self):
values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y='int2', x='int1'))
self.assertEqual(values, {'regravgy': 2.0}) self.assertEqual(values, {'regravgy': 2.0})
def test_regr_avgy_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y='int2', x='int1'))
self.assertEqual(values, {'regravgy': None})
def test_regr_count_general(self): def test_regr_count_general(self):
values = StatTestModel.objects.aggregate(regrcount=RegrCount(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrcount=RegrCount(y='int2', x='int1'))
self.assertEqual(values, {'regrcount': 3}) self.assertEqual(values, {'regrcount': 3})
def test_regr_count_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrcount=RegrCount(y='int2', x='int1'))
self.assertEqual(values, {'regrcount': 0})
def test_regr_intercept_general(self): def test_regr_intercept_general(self):
values = StatTestModel.objects.aggregate(regrintercept=RegrIntercept(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrintercept=RegrIntercept(y='int2', x='int1'))
self.assertEqual(values, {'regrintercept': 4}) self.assertEqual(values, {'regrintercept': 4})
def test_regr_intercept_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrintercept=RegrIntercept(y='int2', x='int1'))
self.assertEqual(values, {'regrintercept': None})
def test_regr_r2_general(self): def test_regr_r2_general(self):
values = StatTestModel.objects.aggregate(regrr2=RegrR2(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrr2=RegrR2(y='int2', x='int1'))
self.assertEqual(values, {'regrr2': 1}) self.assertEqual(values, {'regrr2': 1})
def test_regr_r2_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrr2=RegrR2(y='int2', x='int1'))
self.assertEqual(values, {'regrr2': None})
def test_regr_slope_general(self): def test_regr_slope_general(self):
values = StatTestModel.objects.aggregate(regrslope=RegrSlope(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrslope=RegrSlope(y='int2', x='int1'))
self.assertEqual(values, {'regrslope': -1}) self.assertEqual(values, {'regrslope': -1})
def test_regr_slope_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrslope=RegrSlope(y='int2', x='int1'))
self.assertEqual(values, {'regrslope': None})
def test_regr_sxx_general(self): def test_regr_sxx_general(self):
values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y='int2', x='int1'))
self.assertEqual(values, {'regrsxx': 2.0}) self.assertEqual(values, {'regrsxx': 2.0})
def test_regr_sxx_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y='int2', x='int1'))
self.assertEqual(values, {'regrsxx': None})
def test_regr_sxy_general(self): def test_regr_sxy_general(self):
values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y='int2', x='int1'))
self.assertEqual(values, {'regrsxy': -2.0}) self.assertEqual(values, {'regrsxy': -2.0})
def test_regr_sxy_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y='int2', x='int1'))
self.assertEqual(values, {'regrsxy': None})
def test_regr_syy_general(self): def test_regr_syy_general(self):
values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y='int2', x='int1')) values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y='int2', x='int1'))
self.assertEqual(values, {'regrsyy': 2.0}) self.assertEqual(values, {'regrsyy': 2.0})
def test_regr_syy_empty_result(self):
StatTestModel.objects.all().delete()
values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y='int2', x='int1'))
self.assertEqual(values, {'regrsyy': None})
def test_regr_avgx_with_related_obj_and_number_as_argument(self): def test_regr_avgx_with_related_obj_and_number_as_argument(self):
""" """
This is more complex test to check if JOIN on field and This is more complex test to check if JOIN on field and