Changed dispatcher to use a list of processing functions to support multiple processing functions
This commit is contained in:
parent
476c6e7a4b
commit
f8aff44e8b
|
@ -1,36 +1,58 @@
|
||||||
import logging
|
import logging
|
||||||
|
import typing
|
||||||
|
|
||||||
from common.data.system_info_collectors_names import AWS_COLLECTOR, ENVIRONMENT_COLLECTOR, HOSTNAME_COLLECTOR
|
from common.data.system_info_collectors_names import *
|
||||||
from monkey_island.cc.services.telemetry.processing.system_info_collectors.aws import process_aws_telemetry
|
from monkey_island.cc.services.telemetry.processing.system_info_collectors.aws import process_aws_telemetry
|
||||||
from monkey_island.cc.services.telemetry.processing.system_info_collectors.environment import process_environment_telemetry
|
from monkey_island.cc.services.telemetry.processing.system_info_collectors.environment import process_environment_telemetry
|
||||||
from monkey_island.cc.services.telemetry.processing.system_info_collectors.hostname import process_hostname_telemetry
|
from monkey_island.cc.services.telemetry.processing.system_info_collectors.hostname import process_hostname_telemetry
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSOR = {
|
SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSORS = {
|
||||||
AWS_COLLECTOR: process_aws_telemetry,
|
AWS_COLLECTOR: [process_aws_telemetry],
|
||||||
ENVIRONMENT_COLLECTOR: process_environment_telemetry,
|
ENVIRONMENT_COLLECTOR: [process_environment_telemetry],
|
||||||
HOSTNAME_COLLECTOR: process_hostname_telemetry,
|
HOSTNAME_COLLECTOR: [process_hostname_telemetry],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class SystemInfoTelemetryDispatcher(object):
|
class SystemInfoTelemetryDispatcher(object):
|
||||||
def __init__(self, collector_to_parsing_function=None):
|
def __init__(self, collector_to_parsing_functions: typing.Mapping[str, typing.List[typing.Callable]] = None):
|
||||||
if collector_to_parsing_function is None:
|
"""
|
||||||
collector_to_parsing_function = SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSOR
|
:param collector_to_parsing_functions: Map between collector names and a list of functions that process the output of
|
||||||
self.collector_to_parsing_function = collector_to_parsing_function
|
that collector. If `None` is supplied, uses the default one; This should be the normal flow, overriding the
|
||||||
|
collector->functions mapping is useful mostly for testing.
|
||||||
|
"""
|
||||||
|
if collector_to_parsing_functions is None:
|
||||||
|
collector_to_parsing_functions = SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSORS
|
||||||
|
self.collector_to_processing_functions = collector_to_parsing_functions
|
||||||
|
|
||||||
def dispatch_to_relevant_collectors(self, telemetry_json):
|
def dispatch_collector_results_to_relevant_processors(self, telemetry_json):
|
||||||
|
"""
|
||||||
|
If the telemetry has collectors' results, dispatches the results to the relevant processing functions.
|
||||||
|
:param telemetry_json: Telemetry sent from the Monkey
|
||||||
|
"""
|
||||||
if "collectors" in telemetry_json["data"]:
|
if "collectors" in telemetry_json["data"]:
|
||||||
self.send_each_result_to_relevant_processor(telemetry_json)
|
self.dispatch_each_result_to_relevant_processors(telemetry_json)
|
||||||
|
|
||||||
def send_each_result_to_relevant_processor(self, telemetry_json):
|
def dispatch_each_result_to_relevant_processors(self, telemetry_json):
|
||||||
relevant_monkey_guid = telemetry_json['monkey_guid']
|
relevant_monkey_guid = telemetry_json['monkey_guid']
|
||||||
|
|
||||||
for collector_name, collector_results in telemetry_json["data"]["collectors"].items():
|
for collector_name, collector_results in telemetry_json["data"]["collectors"].items():
|
||||||
if collector_name in self.collector_to_parsing_function:
|
self.dispatch_result_of_single_collector_to_processing_functions(
|
||||||
|
collector_name,
|
||||||
|
collector_results,
|
||||||
|
relevant_monkey_guid)
|
||||||
|
|
||||||
|
def dispatch_result_of_single_collector_to_processing_functions(
|
||||||
|
self,
|
||||||
|
collector_name,
|
||||||
|
collector_results,
|
||||||
|
relevant_monkey_guid):
|
||||||
|
if collector_name in self.collector_to_processing_functions:
|
||||||
|
for processing_function in self.collector_to_processing_functions[collector_name]:
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
self.collector_to_parsing_function[collector_name](collector_results, relevant_monkey_guid)
|
processing_function(collector_results, relevant_monkey_guid)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Error {} while processing {} system info telemetry".format(str(e), collector_name),
|
"Error {} while processing {} system info telemetry".format(str(e), collector_name),
|
||||||
|
|
|
@ -8,7 +8,7 @@ from monkey_island.cc.services.telemetry.processing.system_info_collectors.syste
|
||||||
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
||||||
|
|
||||||
TEST_SYS_INFO_TO_PROCESSING = {
|
TEST_SYS_INFO_TO_PROCESSING = {
|
||||||
"AwsCollector": process_aws_telemetry,
|
"AwsCollector": [process_aws_telemetry],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -20,18 +20,18 @@ class SystemInfoTelemetryDispatcherTest(IslandTestCase):
|
||||||
|
|
||||||
# Bad format telem JSONs - throws
|
# Bad format telem JSONs - throws
|
||||||
bad_empty_telem_json = {}
|
bad_empty_telem_json = {}
|
||||||
self.assertRaises(KeyError, dispatcher.dispatch_to_relevant_collectors, bad_empty_telem_json)
|
self.assertRaises(KeyError, dispatcher.dispatch_collector_results_to_relevant_processors, bad_empty_telem_json)
|
||||||
bad_no_data_telem_json = {"monkey_guid": "bla"}
|
bad_no_data_telem_json = {"monkey_guid": "bla"}
|
||||||
self.assertRaises(KeyError, dispatcher.dispatch_to_relevant_collectors, bad_no_data_telem_json)
|
self.assertRaises(KeyError, dispatcher.dispatch_collector_results_to_relevant_processors, bad_no_data_telem_json)
|
||||||
bad_no_monkey_telem_json = {"data": {"collectors": {"AwsCollector": "Bla"}}}
|
bad_no_monkey_telem_json = {"data": {"collectors": {"AwsCollector": "Bla"}}}
|
||||||
self.assertRaises(KeyError, dispatcher.dispatch_to_relevant_collectors, bad_no_monkey_telem_json)
|
self.assertRaises(KeyError, dispatcher.dispatch_collector_results_to_relevant_processors, bad_no_monkey_telem_json)
|
||||||
|
|
||||||
# Telem JSON with no collectors - nothing gets dispatched
|
# Telem JSON with no collectors - nothing gets dispatched
|
||||||
good_telem_no_collectors = {"monkey_guid": "bla", "data": {"bla": "bla"}}
|
good_telem_no_collectors = {"monkey_guid": "bla", "data": {"bla": "bla"}}
|
||||||
good_telem_empty_collectors = {"monkey_guid": "bla", "data": {"bla": "bla", "collectors": {}}}
|
good_telem_empty_collectors = {"monkey_guid": "bla", "data": {"bla": "bla", "collectors": {}}}
|
||||||
|
|
||||||
dispatcher.dispatch_to_relevant_collectors(good_telem_no_collectors)
|
dispatcher.dispatch_collector_results_to_relevant_processors(good_telem_no_collectors)
|
||||||
dispatcher.dispatch_to_relevant_collectors(good_telem_empty_collectors)
|
dispatcher.dispatch_collector_results_to_relevant_processors(good_telem_empty_collectors)
|
||||||
|
|
||||||
def test_dispatch_to_relevant_collector(self):
|
def test_dispatch_to_relevant_collector(self):
|
||||||
self.fail_if_not_testing_env()
|
self.fail_if_not_testing_env()
|
||||||
|
@ -52,6 +52,6 @@ class SystemInfoTelemetryDispatcherTest(IslandTestCase):
|
||||||
},
|
},
|
||||||
"monkey_guid": a_monkey.guid
|
"monkey_guid": a_monkey.guid
|
||||||
}
|
}
|
||||||
dispatcher.dispatch_to_relevant_collectors(telem_json)
|
dispatcher.dispatch_collector_results_to_relevant_processors(telem_json)
|
||||||
|
|
||||||
self.assertEquals(Monkey.get_single_monkey_by_guid(a_monkey.guid).aws_instance_id, instance_id)
|
self.assertEquals(Monkey.get_single_monkey_by_guid(a_monkey.guid).aws_instance_id, instance_id)
|
||||||
|
|
Loading…
Reference in New Issue