Added ability to export test telemetries to directory
This commit is contained in:
parent
7d397da245
commit
a400da276a
|
@ -6,13 +6,7 @@ from mongoengine import Document, StringField
|
||||||
|
|
||||||
class TestTelem(Document):
|
class TestTelem(Document):
|
||||||
# SCHEMA
|
# SCHEMA
|
||||||
|
name = StringField(required=True)
|
||||||
method = StringField(required=True)
|
method = StringField(required=True)
|
||||||
endpoint = StringField(required=True)
|
endpoint = StringField(required=True)
|
||||||
content = StringField(required=True)
|
content = StringField(required=True)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def try_drop_collection():
|
|
||||||
try:
|
|
||||||
TestTelem.drop_collection()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ from datetime import datetime
|
||||||
|
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
import flask_restful
|
import flask_restful
|
||||||
from monkey_island.cc.resources.test.utils.telem_store import store_test_telem
|
from monkey_island.cc.resources.test.utils.telem_store import TestTelemStore
|
||||||
from flask import request
|
from flask import request
|
||||||
|
|
||||||
from monkey_island.cc.consts import DEFAULT_MONKEY_TTL_EXPIRY_DURATION_IN_SECONDS
|
from monkey_island.cc.consts import DEFAULT_MONKEY_TTL_EXPIRY_DURATION_IN_SECONDS
|
||||||
|
@ -34,7 +34,7 @@ class Monkey(flask_restful.Resource):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# Used by monkey. can't secure.
|
# Used by monkey. can't secure.
|
||||||
@store_test_telem
|
@TestTelemStore.store_test_telem
|
||||||
def patch(self, guid):
|
def patch(self, guid):
|
||||||
monkey_json = json.loads(request.data)
|
monkey_json = json.loads(request.data)
|
||||||
update = {"$set": {'modifytime': datetime.now()}}
|
update = {"$set": {'modifytime': datetime.now()}}
|
||||||
|
@ -58,7 +58,7 @@ class Monkey(flask_restful.Resource):
|
||||||
return mongo.db.monkey.update({"_id": monkey["_id"]}, update, upsert=False)
|
return mongo.db.monkey.update({"_id": monkey["_id"]}, update, upsert=False)
|
||||||
|
|
||||||
# Used by monkey. can't secure.
|
# Used by monkey. can't secure.
|
||||||
@store_test_telem
|
@TestTelemStore.store_test_telem
|
||||||
def post(self, **kw):
|
def post(self, **kw):
|
||||||
monkey_json = json.loads(request.data)
|
monkey_json = json.loads(request.data)
|
||||||
monkey_json['creds'] = []
|
monkey_json['creds'] = []
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
from os import mkdir, path
|
||||||
|
|
||||||
from flask import request
|
from flask import request
|
||||||
|
|
||||||
|
@ -7,13 +8,47 @@ from monkey_island.cc.models.test_telem import TestTelem
|
||||||
MONKEY_TELEM_COLLECTION_NAME = "monkey_telems_for_tests"
|
MONKEY_TELEM_COLLECTION_NAME = "monkey_telems_for_tests"
|
||||||
|
|
||||||
|
|
||||||
def store_test_telem(f):
|
class TestTelemStore:
|
||||||
@wraps(f)
|
|
||||||
def decorated_function(*args, **kwargs):
|
|
||||||
method = request.method
|
|
||||||
content = request.data.decode()
|
|
||||||
endpoint = str(request.url_rule)
|
|
||||||
TestTelem(method=method, endpoint=endpoint, content=content).save()
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
|
|
||||||
return decorated_function
|
@staticmethod
|
||||||
|
def store_test_telem(f):
|
||||||
|
@wraps(f)
|
||||||
|
def decorated_function(*args, **kwargs):
|
||||||
|
method = request.method
|
||||||
|
content = request.data.decode()
|
||||||
|
endpoint = request.path
|
||||||
|
name = str(request.url_rule).replace('/', '_').replace('<', '_').replace('>', '_').replace(':', '_')
|
||||||
|
TestTelem(name=name, method=method, endpoint=endpoint, content=content).save()
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
|
return decorated_function
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def export_test_telems():
|
||||||
|
telem_dir = "./test_telems"
|
||||||
|
try:
|
||||||
|
mkdir(telem_dir)
|
||||||
|
except FileExistsError:
|
||||||
|
pass
|
||||||
|
for test_telem in TestTelem.objects():
|
||||||
|
with open(TestTelemStore.get_unique_file_path_for_test_telem(telem_dir, test_telem), 'w') as file:
|
||||||
|
file.write(test_telem.to_json())
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_unique_file_path_for_test_telem(target_dir: str, test_telem: TestTelem):
|
||||||
|
telem_filename = TestTelemStore._get_filename_by_test_telem(test_telem)
|
||||||
|
for i in range(100):
|
||||||
|
potential_filepath = path.join(target_dir, (telem_filename + str(i)))
|
||||||
|
if path.exists(potential_filepath):
|
||||||
|
continue
|
||||||
|
return potential_filepath
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_filename_by_test_telem(test_telem: TestTelem):
|
||||||
|
endpoint_part = test_telem.name
|
||||||
|
return endpoint_part + '_' + test_telem.method
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
TestTelemStore.export_test_telems()
|
||||||
|
|
Loading…
Reference in New Issue