import operator import uuid from unittest import mock, skipIf, skipUnless from django import forms from django.core import serializers from django.core.exceptions import ValidationError from django.core.serializers.json import DjangoJSONEncoder from django.db import ( DataError, IntegrityError, NotSupportedError, OperationalError, connection, models, ) from django.db.models import Count, F, OuterRef, Q, Subquery, Transform, Value from django.db.models.expressions import RawSQL from django.db.models.fields.json import ( KeyTextTransform, KeyTransform, KeyTransformFactory, KeyTransformTextLookupMixin, ) from django.db.models.functions import Cast from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature from django.test.utils import CaptureQueriesContext from .models import CustomJSONDecoder, JSONModel, NullableJSONModel @skipUnlessDBFeature('supports_json_field') class JSONFieldTests(TestCase): def test_invalid_value(self): msg = 'is not JSON serializable' with self.assertRaisesMessage(TypeError, msg): NullableJSONModel.objects.create(value={ 'uuid': uuid.UUID('d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475'), }) def test_custom_encoder_decoder(self): value = {'uuid': uuid.UUID('{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}')} obj = NullableJSONModel(value_custom=value) obj.clean_fields() obj.save() obj.refresh_from_db() self.assertEqual(obj.value_custom, value) def test_db_check_constraints(self): value = '{@!invalid json value 123 $!@#' with mock.patch.object(DjangoJSONEncoder, 'encode', return_value=value): with self.assertRaises((IntegrityError, DataError, OperationalError)): NullableJSONModel.objects.create(value_custom=value) class TestMethods(SimpleTestCase): def test_deconstruct(self): field = models.JSONField() name, path, args, kwargs = field.deconstruct() self.assertEqual(path, 'django.db.models.JSONField') self.assertEqual(args, []) self.assertEqual(kwargs, {}) def test_deconstruct_custom_encoder_decoder(self): field = models.JSONField(encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder) name, path, args, kwargs = field.deconstruct() self.assertEqual(kwargs['encoder'], DjangoJSONEncoder) self.assertEqual(kwargs['decoder'], CustomJSONDecoder) def test_get_transforms(self): @models.JSONField.register_lookup class MyTransform(Transform): lookup_name = 'my_transform' field = models.JSONField() transform = field.get_transform('my_transform') self.assertIs(transform, MyTransform) models.JSONField._unregister_lookup(MyTransform) models.JSONField._clear_cached_lookups() transform = field.get_transform('my_transform') self.assertIsInstance(transform, KeyTransformFactory) def test_key_transform_text_lookup_mixin_non_key_transform(self): transform = Transform('test') msg = ( 'Transform should be an instance of KeyTransform in order to use ' 'this lookup.' ) with self.assertRaisesMessage(TypeError, msg): KeyTransformTextLookupMixin(transform) class TestValidation(SimpleTestCase): def test_invalid_encoder(self): msg = 'The encoder parameter must be a callable object.' with self.assertRaisesMessage(ValueError, msg): models.JSONField(encoder=DjangoJSONEncoder()) def test_invalid_decoder(self): msg = 'The decoder parameter must be a callable object.' with self.assertRaisesMessage(ValueError, msg): models.JSONField(decoder=CustomJSONDecoder()) def test_validation_error(self): field = models.JSONField() msg = 'Value must be valid JSON.' value = uuid.UUID('{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}') with self.assertRaisesMessage(ValidationError, msg): field.clean({'uuid': value}, None) def test_custom_encoder(self): field = models.JSONField(encoder=DjangoJSONEncoder) value = uuid.UUID('{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}') field.clean({'uuid': value}, None) class TestFormField(SimpleTestCase): def test_formfield(self): model_field = models.JSONField() form_field = model_field.formfield() self.assertIsInstance(form_field, forms.JSONField) def test_formfield_custom_encoder_decoder(self): model_field = models.JSONField(encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder) form_field = model_field.formfield() self.assertIs(form_field.encoder, DjangoJSONEncoder) self.assertIs(form_field.decoder, CustomJSONDecoder) class TestSerialization(SimpleTestCase): test_data = ( '[{"fields": {"value": %s}, ' '"model": "model_fields.jsonmodel", "pk": null}]' ) test_values = ( # (Python value, serialized value), ({'a': 'b', 'c': None}, '{"a": "b", "c": null}'), ('abc', '"abc"'), ('{"a": "a"}', '"{\\"a\\": \\"a\\"}"'), ) def test_dumping(self): for value, serialized in self.test_values: with self.subTest(value=value): instance = JSONModel(value=value) data = serializers.serialize('json', [instance]) self.assertJSONEqual(data, self.test_data % serialized) def test_loading(self): for value, serialized in self.test_values: with self.subTest(value=value): instance = list( serializers.deserialize('json', self.test_data % serialized) )[0].object self.assertEqual(instance.value, value) @skipUnlessDBFeature('supports_json_field') class TestSaveLoad(TestCase): def test_null(self): obj = NullableJSONModel(value=None) obj.save() obj.refresh_from_db() self.assertIsNone(obj.value) @skipUnlessDBFeature('supports_primitives_in_json_field') def test_json_null_different_from_sql_null(self): json_null = NullableJSONModel.objects.create(value=Value('null')) json_null.refresh_from_db() sql_null = NullableJSONModel.objects.create(value=None) sql_null.refresh_from_db() # 'null' is not equal to NULL in the database. self.assertSequenceEqual( NullableJSONModel.objects.filter(value=Value('null')), [json_null], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value=None), [json_null], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__isnull=True), [sql_null], ) # 'null' is equal to NULL in Python (None). self.assertEqual(json_null.value, sql_null.value) @skipUnlessDBFeature('supports_primitives_in_json_field') def test_primitives(self): values = [ True, 1, 1.45, 'String', '', ] for value in values: with self.subTest(value=value): obj = JSONModel(value=value) obj.save() obj.refresh_from_db() self.assertEqual(obj.value, value) def test_dict(self): values = [ {}, {'name': 'John', 'age': 20, 'height': 180.3}, {'a': True, 'b': {'b1': False, 'b2': None}}, ] for value in values: with self.subTest(value=value): obj = JSONModel.objects.create(value=value) obj.refresh_from_db() self.assertEqual(obj.value, value) def test_list(self): values = [ [], ['John', 20, 180.3], [True, [False, None]], ] for value in values: with self.subTest(value=value): obj = JSONModel.objects.create(value=value) obj.refresh_from_db() self.assertEqual(obj.value, value) def test_realistic_object(self): value = { 'name': 'John', 'age': 20, 'pets': [ {'name': 'Kit', 'type': 'cat', 'age': 2}, {'name': 'Max', 'type': 'dog', 'age': 1}, ], 'courses': [ ['A1', 'A2', 'A3'], ['B1', 'B2'], ['C1'], ], } obj = JSONModel.objects.create(value=value) obj.refresh_from_db() self.assertEqual(obj.value, value) @skipUnlessDBFeature('supports_json_field') class TestQuerying(TestCase): @classmethod def setUpTestData(cls): cls.primitives = [True, False, 'yes', 7, 9.6] values = [ None, [], {}, {'a': 'b', 'c': 14}, { 'a': 'b', 'c': 14, 'd': ['e', {'f': 'g'}], 'h': True, 'i': False, 'j': None, 'k': {'l': 'm'}, 'n': [None], }, [1, [2]], {'k': True, 'l': False}, { 'foo': 'bar', 'baz': {'a': 'b', 'c': 'd'}, 'bar': ['foo', 'bar'], 'bax': {'foo': 'bar'}, }, ] cls.objs = [ NullableJSONModel.objects.create(value=value) for value in values ] if connection.features.supports_primitives_in_json_field: cls.objs.extend([ NullableJSONModel.objects.create(value=value) for value in cls.primitives ]) cls.raw_sql = '%s::jsonb' if connection.vendor == 'postgresql' else '%s' def test_exact(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__exact={}), [self.objs[2]], ) def test_exact_complex(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__exact={'a': 'b', 'c': 14}), [self.objs[3]], ) def test_isnull(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__isnull=True), [self.objs[0]], ) def test_ordering_by_transform(self): objs = [ NullableJSONModel.objects.create(value={'ord': 93, 'name': 'bar'}), NullableJSONModel.objects.create(value={'ord': 22.1, 'name': 'foo'}), NullableJSONModel.objects.create(value={'ord': -1, 'name': 'baz'}), NullableJSONModel.objects.create(value={'ord': 21.931902, 'name': 'spam'}), NullableJSONModel.objects.create(value={'ord': -100291029, 'name': 'eggs'}), ] query = NullableJSONModel.objects.filter(value__name__isnull=False).order_by('value__ord') expected = [objs[4], objs[2], objs[3], objs[1], objs[0]] mariadb = connection.vendor == 'mysql' and connection.mysql_is_mariadb if mariadb or connection.vendor == 'oracle': # MariaDB and Oracle return JSON values as strings. expected = [objs[2], objs[4], objs[3], objs[1], objs[0]] self.assertSequenceEqual(query, expected) def test_ordering_grouping_by_key_transform(self): base_qs = NullableJSONModel.objects.filter(value__d__0__isnull=False) for qs in ( base_qs.order_by('value__d__0'), base_qs.annotate(key=KeyTransform('0', KeyTransform('d', 'value'))).order_by('key'), ): self.assertSequenceEqual(qs, [self.objs[4]]) qs = NullableJSONModel.objects.filter(value__isnull=False) self.assertQuerysetEqual( qs.filter(value__isnull=False).annotate( key=KeyTextTransform('f', KeyTransform('1', KeyTransform('d', 'value'))), ).values('key').annotate(count=Count('key')).order_by('count'), [(None, 0), ('g', 1)], operator.itemgetter('key', 'count'), ) @skipIf(connection.vendor == 'oracle', "Oracle doesn't support grouping by LOBs, see #24096.") def test_ordering_grouping_by_count(self): qs = NullableJSONModel.objects.filter( value__isnull=False, ).values('value__d__0').annotate(count=Count('value__d__0')).order_by('count') self.assertQuerysetEqual(qs, [1, 11], operator.itemgetter('count')) def test_key_transform_raw_expression(self): expr = RawSQL(self.raw_sql, ['{"x": "bar"}']) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__foo=KeyTransform('x', expr)), [self.objs[7]], ) def test_nested_key_transform_raw_expression(self): expr = RawSQL(self.raw_sql, ['{"x": {"y": "bar"}}']) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__foo=KeyTransform('y', KeyTransform('x', expr))), [self.objs[7]], ) def test_key_transform_expression(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False).annotate( key=KeyTransform('d', 'value'), chain=KeyTransform('0', 'key'), expr=KeyTransform('0', Cast('key', models.JSONField())), ).filter(chain=F('expr')), [self.objs[4]], ) def test_nested_key_transform_expression(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False).annotate( key=KeyTransform('d', 'value'), chain=KeyTransform('f', KeyTransform('1', 'key')), expr=KeyTransform('f', KeyTransform('1', Cast('key', models.JSONField()))), ).filter(chain=F('expr')), [self.objs[4]], ) def test_has_key(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_key='a'), [self.objs[3], self.objs[4]], ) def test_has_key_null_value(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_key='j'), [self.objs[4]], ) def test_has_key_deep(self): tests = [ (Q(value__baz__has_key='a'), self.objs[7]), (Q(value__has_key=KeyTransform('a', KeyTransform('baz', 'value'))), self.objs[7]), (Q(value__has_key=KeyTransform('c', KeyTransform('baz', 'value'))), self.objs[7]), (Q(value__d__1__has_key='f'), self.objs[4]), ( Q(value__has_key=KeyTransform('f', KeyTransform('1', KeyTransform('d', 'value')))), self.objs[4], ) ] for condition, expected in tests: with self.subTest(condition=condition): self.assertSequenceEqual( NullableJSONModel.objects.filter(condition), [expected], ) def test_has_key_list(self): obj = NullableJSONModel.objects.create(value=[{'a': 1}, {'b': 'x'}]) tests = [ Q(value__1__has_key='b'), Q(value__has_key=KeyTransform('b', KeyTransform(1, 'value'))), Q(value__has_key=KeyTransform('b', KeyTransform('1', 'value'))), ] for condition in tests: with self.subTest(condition=condition): self.assertSequenceEqual( NullableJSONModel.objects.filter(condition), [obj], ) def test_has_keys(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_keys=['a', 'c', 'h']), [self.objs[4]], ) def test_has_any_keys(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_any_keys=['c', 'l']), [self.objs[3], self.objs[4], self.objs[6]], ) def test_contains(self): tests = [ ({}, self.objs[2:5] + self.objs[6:8]), ({'baz': {'a': 'b', 'c': 'd'}}, [self.objs[7]]), ({'k': True, 'l': False}, [self.objs[6]]), ({'d': ['e', {'f': 'g'}]}, [self.objs[4]]), ([1, [2]], [self.objs[5]]), ({'n': [None]}, [self.objs[4]]), ({'j': None}, [self.objs[4]]), ] for value, expected in tests: with self.subTest(value=value): qs = NullableJSONModel.objects.filter(value__contains=value) self.assertSequenceEqual(qs, expected) @skipUnlessDBFeature('supports_primitives_in_json_field') def test_contains_primitives(self): for value in self.primitives: with self.subTest(value=value): qs = NullableJSONModel.objects.filter(value__contains=value) self.assertIs(qs.exists(), True) @skipIf( connection.vendor == 'oracle', "Oracle doesn't support contained_by lookup.", ) def test_contained_by(self): qs = NullableJSONModel.objects.filter(value__contained_by={'a': 'b', 'c': 14, 'h': True}) self.assertSequenceEqual(qs, self.objs[2:4]) @skipUnless( connection.vendor == 'oracle', "Oracle doesn't support contained_by lookup.", ) def test_contained_by_unsupported(self): msg = 'contained_by lookup is not supported on Oracle.' with self.assertRaisesMessage(NotSupportedError, msg): NullableJSONModel.objects.filter(value__contained_by={'a': 'b'}).get() def test_deep_values(self): qs = NullableJSONModel.objects.values_list('value__k__l') expected_objs = [(None,)] * len(self.objs) expected_objs[4] = ('m',) self.assertSequenceEqual(qs, expected_objs) @skipUnlessDBFeature('can_distinct_on_fields') def test_deep_distinct(self): query = NullableJSONModel.objects.distinct('value__k__l').values_list('value__k__l') self.assertSequenceEqual(query, [('m',), (None,)]) def test_isnull_key(self): # key__isnull=False works the same as has_key='key'. self.assertSequenceEqual( NullableJSONModel.objects.filter(value__a__isnull=True), self.objs[:3] + self.objs[5:], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__a__isnull=False), [self.objs[3], self.objs[4]], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__j__isnull=False), [self.objs[4]], ) def test_isnull_key_or_none(self): obj = NullableJSONModel.objects.create(value={'a': None}) self.assertSequenceEqual( NullableJSONModel.objects.filter(Q(value__a__isnull=True) | Q(value__a=None)), self.objs[:3] + self.objs[5:] + [obj], ) def test_none_key(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__j=None), [self.objs[4]], ) def test_none_key_exclude(self): obj = NullableJSONModel.objects.create(value={'j': 1}) if connection.vendor == 'oracle': # Oracle supports filtering JSON objects with NULL keys, but the # current implementation doesn't support it. self.assertSequenceEqual( NullableJSONModel.objects.exclude(value__j=None), self.objs[1:4] + self.objs[5:] + [obj], ) else: self.assertSequenceEqual(NullableJSONModel.objects.exclude(value__j=None), [obj]) def test_shallow_list_lookup(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__0=1), [self.objs[5]], ) def test_shallow_obj_lookup(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__a='b'), [self.objs[3], self.objs[4]], ) def test_obj_subquery_lookup(self): qs = NullableJSONModel.objects.annotate( field=Subquery(NullableJSONModel.objects.filter(pk=OuterRef('pk')).values('value')), ).filter(field__a='b') self.assertSequenceEqual(qs, [self.objs[3], self.objs[4]]) def test_deep_lookup_objs(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__k__l='m'), [self.objs[4]], ) def test_shallow_lookup_obj_target(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__k={'l': 'm'}), [self.objs[4]], ) def test_deep_lookup_array(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__1__0=2), [self.objs[5]], ) def test_deep_lookup_mixed(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__1__f='g'), [self.objs[4]], ) def test_deep_lookup_transform(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__c__gt=2), [self.objs[3], self.objs[4]], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__c__gt=2.33), [self.objs[3], self.objs[4]], ) self.assertIs(NullableJSONModel.objects.filter(value__c__lt=5).exists(), False) @skipIf( connection.vendor == 'oracle', 'Raises ORA-00600: internal error code on Oracle 18.', ) def test_usage_in_subquery(self): self.assertSequenceEqual( NullableJSONModel.objects.filter( id__in=NullableJSONModel.objects.filter(value__c=14), ), self.objs[3:5], ) def test_key_iexact(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__iexact='BaR').exists(), True) self.assertIs(NullableJSONModel.objects.filter(value__foo__iexact='"BaR"').exists(), False) def test_key_contains(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__contains='ar').exists(), True) def test_key_icontains(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__icontains='Ar').exists(), True) def test_key_startswith(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__startswith='b').exists(), True) def test_key_istartswith(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__istartswith='B').exists(), True) def test_key_endswith(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__endswith='r').exists(), True) def test_key_iendswith(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__iendswith='R').exists(), True) def test_key_regex(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__regex=r'^bar$').exists(), True) def test_key_iregex(self): self.assertIs(NullableJSONModel.objects.filter(value__foo__iregex=r'^bAr$').exists(), True) @skipUnless(connection.vendor == 'postgresql', 'kwargs are crafted for PostgreSQL.') def test_key_sql_injection(self): with CaptureQueriesContext(connection) as queries: self.assertIs( NullableJSONModel.objects.filter(**{ """value__test' = '"a"') OR 1 = 1 OR ('d""": 'x', }).exists(), False, ) self.assertIn( """."value" -> 'test'' = ''"a"'') OR 1 = 1 OR (''d') = '"x"' """, queries[0]['sql'], ) @skipIf(connection.vendor == 'postgresql', 'PostgreSQL uses operators not functions.') def test_key_sql_injection_escape(self): query = str(JSONModel.objects.filter(**{ """value__test") = '"a"' OR 1 = 1 OR ("d""": 'x', }).query) self.assertIn('"test\\"', query) self.assertIn('\\"d', query) def test_key_escape(self): obj = NullableJSONModel.objects.create(value={'%total': 10}) self.assertEqual(NullableJSONModel.objects.filter(**{'value__%total': 10}).get(), obj) def test_none_key_and_exact_lookup(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__a='b', value__j=None), [self.objs[4]], ) def test_lookups_with_key_transform(self): tests = ( ('value__d__contains', 'e'), ('value__baz__has_key', 'c'), ('value__baz__has_keys', ['a', 'c']), ('value__baz__has_any_keys', ['a', 'x']), ('value__contains', KeyTransform('bax', 'value')), ('value__has_key', KeyTextTransform('foo', 'value')), ) # contained_by lookup is not supported on Oracle. if connection.vendor != 'oracle': tests += ( ('value__baz__contained_by', {'a': 'b', 'c': 'd', 'e': 'f'}), ( 'value__contained_by', KeyTransform('x', RawSQL( self.raw_sql, ['{"x": {"a": "b", "c": 1, "d": "e"}}'], )), ), ) for lookup, value in tests: with self.subTest(lookup=lookup): self.assertIs(NullableJSONModel.objects.filter( **{lookup: value}, ).exists(), True)