Merge pull request #2076 from guardicore/fix-json-representations

Fix json representations
This commit is contained in:
Mike Salvatore 2022-07-11 12:22:50 -04:00 committed by GitHub
commit 1dba0639c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 52 deletions

View File

@ -6,35 +6,42 @@ from bson.json_util import dumps
from flask import make_response from flask import make_response
def normalize_obj(obj): def _normalize_obj(obj):
if ("_id" in obj) and ("id" not in obj): if ("_id" in obj) and ("id" not in obj):
obj["id"] = obj["_id"] obj["id"] = obj["_id"]
del obj["_id"] del obj["_id"]
for key, value in list(obj.items()): for key, value in list(obj.items()):
if isinstance(value, list):
for i in range(0, len(value)):
obj[key][i] = _normalize_value(value[i])
else:
obj[key] = _normalize_value(value) obj[key] = _normalize_value(value)
return obj return obj
def _normalize_value(value): def _normalize_value(value):
if type(value) == dict: # ObjectId is serializible by default, but returns a dict
return normalize_obj(value) # So serialize it first into a plain string
if isinstance(value, bson.objectid.ObjectId): if isinstance(value, bson.objectid.ObjectId):
return str(value) return str(value)
if isinstance(value, list):
return [_normalize_value(_value) for _value in value]
if isinstance(value, tuple):
return tuple((_normalize_value(_value) for _value in value))
if type(value) == dict:
return _normalize_obj(value)
if isinstance(value, datetime): if isinstance(value, datetime):
return str(value) return str(value)
if issubclass(type(value), Enum): if issubclass(type(value), Enum):
return value.name return value.name
else:
try:
dumps(value)
return value return value
except TypeError:
return value.__dict__
def output_json(obj, code, headers=None): def output_json(value, code, headers=None):
obj = normalize_obj(obj) value = _normalize_value(value)
resp = make_response(dumps(obj), code) resp = make_response(dumps(value), code)
resp.headers.extend(headers or {}) resp.headers.extend(headers or {})
return resp return resp

View File

@ -1,55 +1,69 @@
from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from unittest import TestCase
import bson import bson
from monkey_island.cc.services.representations import normalize_obj from monkey_island.cc.services.representations import _normalize_value
class TestRepresentations(TestCase): @dataclass
def test_normalize_obj(self): class MockClass:
# empty a: int
self.assertEqual({}, normalize_obj({}))
# no special content
self.assertEqual({"a": "a"}, normalize_obj({"a": "a"}))
# _id field -> id field obj_id_str = "123456789012345678901234"
self.assertEqual({"id": 12345}, normalize_obj({"_id": 12345})) bogus_object1 = MockClass(1)
bogus_object2 = MockClass(2)
# obj id field -> str
obj_id_str = "123456789012345678901234"
self.assertEqual(
{"id": obj_id_str}, normalize_obj({"_id": bson.objectid.ObjectId(obj_id_str)})
)
# datetime -> str def test_normalize_dicts():
assert {} == _normalize_value({})
assert {"a": "a"} == _normalize_value({"a": "a"})
assert {"id": 12345} == _normalize_value({"id": 12345})
assert {"id": obj_id_str} == _normalize_value({"id": bson.objectid.ObjectId(obj_id_str)})
dt = datetime.now() dt = datetime.now()
expected = {"a": str(dt)} expected = {"a": str(dt)}
result = normalize_obj({"a": dt}) result = _normalize_value({"a": dt})
self.assertEqual(expected, result) assert expected == result
# dicts and lists
self.assertEqual( def test_normalize_complex():
{"a": [{"ba": obj_id_str, "bb": obj_id_str}], "b": {"id": obj_id_str}}, bogus_dict = {
normalize_obj(
{
"a": [ "a": [
{ {
"ba": bson.objectid.ObjectId(obj_id_str), "ba": bson.objectid.ObjectId(obj_id_str),
"bb": bson.objectid.ObjectId(obj_id_str), "bb": bson.objectid.ObjectId(obj_id_str),
} }
], ],
"b": {"_id": bson.objectid.ObjectId(obj_id_str)}, "b": {"id": bson.objectid.ObjectId(obj_id_str)},
} }
),
)
def test_normalize__enum(self): expected_dict = {"a": [{"ba": obj_id_str, "bb": obj_id_str}], "b": {"id": obj_id_str}}
assert expected_dict == _normalize_value(bogus_dict)
def test_normalize_list():
bogus_list = [bson.objectid.ObjectId(obj_id_str), {"a": "b"}, {"object": [bogus_object1]}]
expected_list = [obj_id_str, {"a": "b"}, {"object": [{"a": 1}]}]
assert expected_list == _normalize_value(bogus_list)
def test_normalize_enum():
class BogusEnum(Enum): class BogusEnum(Enum):
bogus_val = "Bogus" bogus_val = "Bogus"
my_obj = {"something": "something", "my_enum": BogusEnum.bogus_val} my_obj = {"something": "something", "my_enum": BogusEnum.bogus_val}
assert {"something": "something", "my_enum": "bogus_val"} == normalize_obj(my_obj) assert {"something": "something", "my_enum": "bogus_val"} == _normalize_value(my_obj)
def test_normalize_tuple():
bogus_tuple = [{"my_tuple": (bogus_object1, bogus_object2, b"one_two")}]
expected_tuple = [{"my_tuple": ({"a": 1}, {"a": 2}, b"one_two")}]
assert expected_tuple == _normalize_value(bogus_tuple)