On branch ticket_32519

Adds a JSONSet Function which can be used in a queryset.update(json_field=JSONSet(field="json_field", fields={"$.hello": Value("world")})
sqlite supported
postgresql partially supported
oracle untested
mysql/mariadb untested
This commit is contained in:
allen-munsch 2022-02-13 05:52:22 -06:00
parent 5d13cc540e
commit 400b30950d
No known key found for this signature in database
GPG Key ID: 0C977048B8BDD447
8 changed files with 337 additions and 2 deletions

View File

@ -317,6 +317,9 @@ class BaseDatabaseFeatures:
# Does the backend support JSONObject() database function?
has_json_object_function = True
# Does the backend support JSONSet() database function?
has_json_set_function = True
# Does the backend support column collations?
supports_collation_on_charfield = True
supports_collation_on_textfield = True

View File

@ -130,6 +130,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
can_introspect_json_field = property(operator.attrgetter("supports_json_field"))
has_json_object_function = property(operator.attrgetter("supports_json_field"))
has_json_set_function = property(operator.attrgetter("supports_json_field"))
@cached_property
def can_return_columns_from_insert(self):

View File

@ -1,4 +1,13 @@
from .comparison import Cast, Coalesce, Collate, Greatest, JSONObject, Least, NullIf
from .comparison import (
Cast,
Coalesce,
Collate,
Greatest,
JSONObject,
JSONSet,
Least,
NullIf,
)
from .datetime import (
Extract,
ExtractDay,
@ -98,6 +107,7 @@ __all__ = [
"Collate",
"Greatest",
"JSONObject",
"JSONSet",
"Least",
"NullIf",
# datetime

View File

@ -1,6 +1,8 @@
"""Database functions that do comparisons or type conversions."""
from collections.abc import Mapping
from django.db import NotSupportedError
from django.db.models.expressions import Func, Value
from django.db.models.expressions import Func, Value, F
from django.db.models.fields.json import JSONField
from django.utils.regex_helper import _lazy_re_compile
@ -180,6 +182,65 @@ class JSONObject(Func):
)
class JSONSet(Func):
function = "JSON_SET"
template = "%(function)s(%(expressions)s)"
output_field = JSONField()
def __init__(self, field=None, op_type="SET", create=True, fields=None):
self.field_ = field
self.op_type_ = op_type
self.create_ = create
from django.db.models import TextField
if list(fields.keys())[0].find("{") != -1:
expressions = [F(field)]
else:
expressions = [Cast(F(field), TextField())]
for key, value in fields.items():
expressions.extend((Value(key), value))
super().__init__(*expressions)
def as_sql(self, compiler, connection, **extra_context):
if not connection.features.has_json_object_function:
raise NotSupportedError(
"JSONSet() is not supported on this database backend."
)
return super().as_sql(compiler, connection, **extra_context)
def as_postgresql(self, compiler, connection, **extra_context):
return self.as_sql(
compiler,
connection,
function="jsonb_set",
create=", true" if self.create_ else ", false",
template="%(function)s(%(expressions)s%(create)s)",
**extra_context,
)
def as_oracle(self, compiler, connection, **extra_context):
op_types = {
"REMOVE": "REMOVE",
"INSERT": "INSERT",
"REPLACE": "REPLACE",
"APPEND": "APPEND",
"SET": "SET",
"RENAME": "RENAME",
"KEEP": "KEEP",
# "RHSEXPR": "RHSEXPR", # not sure, self.rhs some how?
}
assert self.op_type_ in op_types, f"op_type: {self.op_type_} doesn't exist"
return self.as_sql(
compiler,
connection,
function="JSON_TRANSFORM",
arg_joiner=" = ",
op_type=" " if not self.op_type_ else f" {op_types[self.op_type_]} ",
template="%(function)s(%(op_type)s%(expressions)s)",
**extra_context,
)
class Least(Func):
"""
Return the minimum expression.

View File

@ -172,6 +172,84 @@ Usage example::
>>> author.json_object
{'name': 'margaret smith', 'alias': 'msmith', 'age': 50}
``JSONSet``
--------------
.. class:: JSONSet(field="the_json_column_name",
fields: dict = {},
create: bool = True,
op_type: str = "SET"
)
.. versionadded:: 4.1
Takes a ``field`` and ``fields`` dictionary
- sqlite: can have any number of key, value in the ``fields`` dictionary (field,
fields)
- postgresql: only 1 key, value in the ``fields`` dictionary, optionally can
set ``create`` to ``False`` (field, fields, create)
- mariadb: can have any number of key, value in the ``fields`` dictionary (field,
fields)
- oracle: only 1 key, value in the ``fields`` dictionary, optionally pass in
``op_type`` default to "SET" ( field, fields, op_type )
Usage example::
cls.c1 = Flying.objects.create(
circus={
"id": 1,
"name": "Bingo Monty DuClownPort I",
"profession": {"active": False, "specialization": ["physical", "bits"]},
}
)
cls.c2 = Flying.objects.create(
circus={
"id": 2,
"name": "Bingo Monty DuClownPort II",
"profession": {"active": True, "specialization": ["tumbling"]},
}
)
cls.c3 = Flying.objects.create(
circus={
"id": 3,
"name": "Bingo Monty DuClownPort III",
"profession": {"active": False, "specialization": ["fire tumbling"]},
}
)
##########################################################
objs = Flying.objects.all()
name = "Ringo Monty DuClownTown I"
key = "$.name"
if connection.vendor == "postgresql":
key = "{name}"
name = '"Ringo Monty DuClownTown I"'
ready = JSONSet(field="circus", fields={key: Value(name)})
objs.update(circus=ready)
assertTrue(
all([obj.circus["name"] == name.replace('"', "") for obj in objs])
)
##########################################################
objs = Flying.objects.filter(circus__id=1)
key = "$.profession.specialization[1]"
value = Value("flips")
if connection.vendor == "postgresql":
key = "{profession,specialization,1}"
value = Value('"flips"')
ready = JSONSet(field="circus", fields={key: value})
objs.update(circus=ready)
self.assertEqual(
"physical flips",
" ".join(objs.first().circus["profession"]["specialization"]),
)
``Least``
---------

View File

@ -0,0 +1,169 @@
from django.db import NotSupportedError
from django.db.models import F, Value, TextField, JSONField, CharField
from django.db.models.expressions import RawSQL
from django.db.models.functions import JSONObject, JSONSet, Upper, Concat, Cast, Replace
from django.test import TestCase
from django.test.testcases import skipIfDBFeature, skipUnlessDBFeature
from django.db import connection
from ..models import Flying
class TestDataMixin:
@classmethod
def setUpTestData(cls):
cls.c1 = Flying.objects.create(
circus={
"id": 1,
"name": "Bingo Monty DuClownPort I",
"profession": {"active": False, "specialization": ["physical", "bits"]},
}
)
cls.c2 = Flying.objects.create(
circus={
"id": 2,
"name": "Bingo Monty DuClownPort II",
"profession": {"active": True, "specialization": ["tumbling"]},
}
)
cls.c3 = Flying.objects.create(
circus={
"id": 3,
"name": "Bingo Monty DuClownPort III",
"profession": {"active": False, "specialization": ["fire tumbling"]},
}
)
@skipUnlessDBFeature("has_json_set_function")
class JSONSetTests(TestDataMixin, TestCase):
def test_empty(self):
objs = Flying.objects.all()
with self.assertRaises(AttributeError):
objs.update(circus=JSONSet())
with self.assertRaises(AttributeError):
objs.update(circus=JSONSet({}))
def test_replace_all(self):
objs = Flying.objects.all()
name = "Ringo Monty DuClownTown I"
key = "$.name"
if connection.vendor == "postgresql":
key = "{name}"
name = '"Ringo Monty DuClownTown I"'
ready = JSONSet(field="circus", fields={key: Value(name)})
objs.update(circus=ready)
self.assertTrue(
all([obj.circus["name"] == name.replace('"', "") for obj in objs])
)
def test_replace_array_one(self):
objs = Flying.objects.filter(circus__id=1)
key = "$.profession.specialization[1]"
value = Value("flips")
if connection.vendor == "postgresql":
key = "{profession,specialization,1}"
value = Value('"flips"')
ready = JSONSet(field="circus", fields={key: value})
objs.update(circus=ready)
self.assertEqual(
"physical flips",
" ".join(objs.first().circus["profession"]["specialization"]),
)
def test_replace_array_one_insert(self):
objs = Flying.objects.filter(circus__id=2)
key = "$.profession.specialization[1]"
value = Value("flips")
if connection.vendor == "postgresql":
key = "{profession,specialization,1}"
value = Value('"flips"')
ready = JSONSet(field="circus", fields={key: value})
objs.update(circus=ready)
self.assertEqual(
"tumbling flips",
" ".join(objs.first().circus["profession"]["specialization"]),
)
def test_filter_and_replace(self):
if connection.vendor == "sqlite":
objs = Flying.objects.filter(circus__profession__active=False)
key = "$.profession.active"
ready = JSONSet(field="circus", fields={key: Value(True)})
objs.update(circus=ready)
self.assertTrue(all([obj.circus["profession"]["active"] for obj in objs]))
else:
pass
# objs = Flying.objects.filter(circus__profession__active=False)
# key = "{profession,active}"
# ready = JSONSet(field="circus", fields={key: Value(True)})
# objs.update(circus=ready)
# self.assertTrue(all([obj.circus["profession"]["active"] for obj in objs]))
def test_filter_and_replace_annotate_all(self):
if connection.vendor == "sqlite":
objs = Flying.objects.all()
upper = JSONSet(
field="circus",
fields={
"$.name": Upper(F("circus__name")),
"$.profession.specialization[1]": Value("flips"),
},
)
objs.update(circus=upper)
items = Flying.objects.annotate(
screaming_circus=JSONObject(
name=F("circus__name"),
s=Upper(
Replace(
Concat(
F("circus__profession__specialization__0"),
Value(" "),
F("circus__profession__specialization__1"),
),
Value('"'),
Value(""),
)
),
)
)
self.assertSetEqual(
{
" ".join(
[item.screaming_circus["name"], item.screaming_circus["s"]]
)
for item in items
},
{
"BINGO MONTY DUCLOWNPORT I PHYSICAL FLIPS",
"BINGO MONTY DUCLOWNPORT II TUMBLING FLIPS",
"BINGO MONTY DUCLOWNPORT III FIRE TUMBLING FLIPS",
},
)
def XXX_test_filter_and_replace_annotate_all_postgreql(self):
if connection.vendor == "postgresql":
objs = Flying.objects.all()
upper = JSONSet(
field="circus",
fields={
"{name}": Upper(RawSQL("(circus ->> '%s')::varchar")),
},
)
objs.update(circus=upper)
flips = JSONSet(
field="circus",
fields={
"{profession,specialization,1}": Value('"flips"'),
},
)
objs.update(circus=flips)
@skipIfDBFeature("has_json_object_function")
class JSONObjectNotSupportedTests(TestCase):
def test_not_supported(self):
msg = "JSONSet() is not supported on this database backend."
with self.assertRaisesMessage(NotSupportedError, msg):
Flying.objects.annotate(crying_circus=JSONSet())

View File

@ -89,4 +89,10 @@ class Migration(migrations.Migration):
("f2", models.FloatField(null=True, blank=True)),
],
),
migrations.CreateModel(
name="Flying",
fields=[
("circus", models.JSONField(null=True, blank=True)),
],
),
]

View File

@ -54,3 +54,10 @@ class IntegerModel(models.Model):
class FloatModel(models.Model):
f1 = models.FloatField(null=True, blank=True)
f2 = models.FloatField(null=True, blank=True)
class Flying(models.Model):
circus = models.JSONField(blank=True, null=True)
class Meta:
required_db_features = {"supports_json_field"}