diff --git a/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/system_info_telemetry_dispatcher.py b/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/system_info_telemetry_dispatcher.py index 6a3890491..e20231d8b 100644 --- a/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/system_info_telemetry_dispatcher.py +++ b/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/system_info_telemetry_dispatcher.py @@ -1,39 +1,61 @@ import logging +import typing -from common.data.system_info_collectors_names import AWS_COLLECTOR, ENVIRONMENT_COLLECTOR, HOSTNAME_COLLECTOR +from common.data.system_info_collectors_names import * from monkey_island.cc.services.telemetry.processing.system_info_collectors.aws import process_aws_telemetry from monkey_island.cc.services.telemetry.processing.system_info_collectors.environment import process_environment_telemetry from monkey_island.cc.services.telemetry.processing.system_info_collectors.hostname import process_hostname_telemetry logger = logging.getLogger(__name__) -SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSOR = { - AWS_COLLECTOR: process_aws_telemetry, - ENVIRONMENT_COLLECTOR: process_environment_telemetry, - HOSTNAME_COLLECTOR: process_hostname_telemetry, +SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSORS = { + AWS_COLLECTOR: [process_aws_telemetry], + ENVIRONMENT_COLLECTOR: [process_environment_telemetry], + HOSTNAME_COLLECTOR: [process_hostname_telemetry], } class SystemInfoTelemetryDispatcher(object): - def __init__(self, collector_to_parsing_function=None): - if collector_to_parsing_function is None: - collector_to_parsing_function = SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSOR - self.collector_to_parsing_function = collector_to_parsing_function + def __init__(self, collector_to_parsing_functions: typing.Mapping[str, typing.List[typing.Callable]] = None): + """ + :param collector_to_parsing_functions: Map between collector names and a list of functions that process the output of + that collector. If `None` is supplied, uses the default one; This should be the normal flow, overriding the + collector->functions mapping is useful mostly for testing. + """ + if collector_to_parsing_functions is None: + collector_to_parsing_functions = SYSTEM_INFO_COLLECTOR_TO_TELEMETRY_PROCESSORS + self.collector_to_processing_functions = collector_to_parsing_functions - def dispatch_to_relevant_collectors(self, telemetry_json): + def dispatch_collector_results_to_relevant_processors(self, telemetry_json): + """ + If the telemetry has collectors' results, dispatches the results to the relevant processing functions. + :param telemetry_json: Telemetry sent from the Monkey + """ if "collectors" in telemetry_json["data"]: - self.send_each_result_to_relevant_processor(telemetry_json) + self.dispatch_each_result_to_relevant_processors(telemetry_json) - def send_each_result_to_relevant_processor(self, telemetry_json): + def dispatch_each_result_to_relevant_processors(self, telemetry_json): relevant_monkey_guid = telemetry_json['monkey_guid'] + for collector_name, collector_results in telemetry_json["data"]["collectors"].items(): - if collector_name in self.collector_to_parsing_function: + self.dispatch_result_of_single_collector_to_processing_functions( + collector_name, + collector_results, + relevant_monkey_guid) + + def dispatch_result_of_single_collector_to_processing_functions( + self, + collector_name, + collector_results, + relevant_monkey_guid): + if collector_name in self.collector_to_processing_functions: + for processing_function in self.collector_to_processing_functions[collector_name]: # noinspection PyBroadException try: - self.collector_to_parsing_function[collector_name](collector_results, relevant_monkey_guid) + processing_function(collector_results, relevant_monkey_guid) except Exception as e: logger.error( "Error {} while processing {} system info telemetry".format(str(e), collector_name), exc_info=True) - else: - logger.warning("Unknown system info collector name: {}".format(collector_name)) + else: + logger.warning("Unknown system info collector name: {}".format(collector_name)) diff --git a/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/test_system_info_telemetry_dispatcher.py b/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/test_system_info_telemetry_dispatcher.py index 4db5352c8..c5cc7aca2 100644 --- a/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/test_system_info_telemetry_dispatcher.py +++ b/monkey/monkey_island/cc/services/telemetry/processing/system_info_collectors/test_system_info_telemetry_dispatcher.py @@ -8,7 +8,7 @@ from monkey_island.cc.services.telemetry.processing.system_info_collectors.syste from monkey_island.cc.testing.IslandTestCase import IslandTestCase TEST_SYS_INFO_TO_PROCESSING = { - "AwsCollector": process_aws_telemetry, + "AwsCollector": [process_aws_telemetry], } @@ -20,18 +20,18 @@ class SystemInfoTelemetryDispatcherTest(IslandTestCase): # Bad format telem JSONs - throws bad_empty_telem_json = {} - self.assertRaises(KeyError, dispatcher.dispatch_to_relevant_collectors, bad_empty_telem_json) + self.assertRaises(KeyError, dispatcher.dispatch_collector_results_to_relevant_processors, bad_empty_telem_json) bad_no_data_telem_json = {"monkey_guid": "bla"} - self.assertRaises(KeyError, dispatcher.dispatch_to_relevant_collectors, bad_no_data_telem_json) + self.assertRaises(KeyError, dispatcher.dispatch_collector_results_to_relevant_processors, bad_no_data_telem_json) bad_no_monkey_telem_json = {"data": {"collectors": {"AwsCollector": "Bla"}}} - self.assertRaises(KeyError, dispatcher.dispatch_to_relevant_collectors, bad_no_monkey_telem_json) + self.assertRaises(KeyError, dispatcher.dispatch_collector_results_to_relevant_processors, bad_no_monkey_telem_json) # Telem JSON with no collectors - nothing gets dispatched good_telem_no_collectors = {"monkey_guid": "bla", "data": {"bla": "bla"}} good_telem_empty_collectors = {"monkey_guid": "bla", "data": {"bla": "bla", "collectors": {}}} - dispatcher.dispatch_to_relevant_collectors(good_telem_no_collectors) - dispatcher.dispatch_to_relevant_collectors(good_telem_empty_collectors) + dispatcher.dispatch_collector_results_to_relevant_processors(good_telem_no_collectors) + dispatcher.dispatch_collector_results_to_relevant_processors(good_telem_empty_collectors) def test_dispatch_to_relevant_collector(self): self.fail_if_not_testing_env() @@ -52,6 +52,6 @@ class SystemInfoTelemetryDispatcherTest(IslandTestCase): }, "monkey_guid": a_monkey.guid } - dispatcher.dispatch_to_relevant_collectors(telem_json) + dispatcher.dispatch_collector_results_to_relevant_processors(telem_json) self.assertEquals(Monkey.get_single_monkey_by_guid(a_monkey.guid).aws_instance_id, instance_id)