From faa4c18cab95cb572b9e7c78fdd7b26ad845be75 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Thu, 14 Oct 2021 14:15:32 +0530 Subject: [PATCH] island: Create class for reverse schema generation to avoid output arguments --- .../attack/technique_reports/__init__.py | 6 +- .../config_schema_per_attack_technique.py | 141 +++++++++--------- 2 files changed, 71 insertions(+), 76 deletions(-) diff --git a/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py b/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py index 00df909f2..529cf7b95 100644 --- a/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py +++ b/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py @@ -9,7 +9,7 @@ from monkey_island.cc.models.attack.attack_mitigations import AttackMitigations from monkey_island.cc.services.attack.attack_config import AttackConfig from monkey_island.cc.services.config_schema.config_schema import SCHEMA from monkey_island.cc.services.config_schema.config_schema_per_attack_technique import ( - get_config_schema_per_attack_technique, + ConfigSchemaPerAttackTechnique, ) logger = logging.getLogger(__name__) @@ -122,8 +122,8 @@ class AttackTechnique(object, metaclass=abc.ABCMeta): return disabled_msg if status == ScanStatus.UNSCANNED.value: if not cls.config_schema_per_attack_technique: - cls.config_schema_per_attack_technique = get_config_schema_per_attack_technique( - SCHEMA + cls.config_schema_per_attack_technique = ( + ConfigSchemaPerAttackTechnique().get_config_schema_per_attack_technique(SCHEMA) ) unscanned_msg = cls._get_unscanned_msg_with_reasons( cls.unscanned_msg, cls.config_schema_per_attack_technique diff --git a/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py b/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py index b945adbc8..547161936 100644 --- a/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py +++ b/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py @@ -1,86 +1,81 @@ from typing import Dict, List -def get_config_schema_per_attack_technique(schema: Dict) -> Dict[str, Dict[str, List[str]]]: - """ - :return: dictionary mapping each attack technique to relevant config fields; example - - { - "T1003": { - "System Info Collectors": [ - "Mimikatz collector", - "Azure credential collector" - ] +class ConfigSchemaPerAttackTechnique: + def __init__(self) -> None: + self.reverse_schema = {} + + def get_config_schema_per_attack_technique( + self, schema: Dict + ) -> Dict[str, Dict[str, List[str]]]: + """ + :return: dictionary mapping each attack technique to relevant config fields; example - + { + "T1003": { + "System Info Collectors": [ + "Mimikatz collector", + "Azure credential collector" + ] + } } - } - """ - reverse_schema = {} + """ + self._crawl_config_schema_definitions_for_reverse_schema(schema) + self._crawl_config_schema_properties_for_reverse_schema(schema) - _crawl_config_schema_definitions_for_reverse_schema(schema, reverse_schema) - _crawl_config_schema_properties_for_reverse_schema(schema, reverse_schema) + return self.reverse_schema - return reverse_schema + def _crawl_config_schema_definitions_for_reverse_schema(self, schema: Dict): + definitions = schema["definitions"] + for definition in definitions: + definition_type = definitions[definition]["title"] + for field in definitions[definition].get("anyOf", []): + config_field = field["title"] + for attack_technique in field.get("attack_techniques", []): + self._add_config_field_to_reverse_schema( + definition_type, config_field, attack_technique + ) - -def _crawl_config_schema_definitions_for_reverse_schema(schema: Dict, reverse_schema: Dict): - definitions = schema["definitions"] - for definition in definitions: - definition_type = definitions[definition]["title"] - for field in definitions[definition].get("anyOf", []): - config_field = field["title"] - for attack_technique in field.get("attack_techniques", []): - _add_config_field_to_reverse_schema( - definition_type, config_field, attack_technique, reverse_schema + def _crawl_config_schema_properties_for_reverse_schema(self, schema: Dict): + properties = schema["properties"] + for prop in properties: + property_type = properties[prop]["title"] + for category_name in properties[prop].get("properties", []): + category = properties[prop]["properties"][category_name] + self._crawl_properties( + config_option_path=property_type, + config_option=category, ) - -def _crawl_config_schema_properties_for_reverse_schema(schema: Dict, reverse_schema: Dict): - properties = schema["properties"] - for prop in properties: - property_type = properties[prop]["title"] - for category_name in properties[prop].get("properties", []): - category = properties[prop]["properties"][category_name] - _crawl_properties( - config_option_path=property_type, - config_option=category, - reverse_schema=reverse_schema, + def _crawl_properties(self, config_option_path: str, config_option: Dict): + config_option_path = ( + f"{config_option_path} -> {config_option['title']}" + if "title" in config_option + else config_option_path + ) + for config_option_name in config_option.get("properties", []): + new_config_option = config_option["properties"][config_option_name] + self._check_related_attack_techniques( + config_option_path=config_option_path, + config_option=new_config_option, ) + # check for "properties" and each property's related techniques recursively; + # the levels of nesting and where related techniques are declared won't + # always be fixed in the config schema + self._crawl_properties(config_option_path, new_config_option) -def _crawl_properties(config_option_path: str, config_option: Dict, reverse_schema: Dict): - config_option_path = ( - f"{config_option_path} -> {config_option['title']}" - if "title" in config_option - else config_option_path - ) - for config_option_name in config_option.get("properties", []): - new_config_option = config_option["properties"][config_option_name] - _check_related_attack_techniques( - config_option_path=config_option_path, - config_option=new_config_option, - reverse_schema=reverse_schema, - ) + def _check_related_attack_techniques(self, config_option_path: str, config_option: Dict): + for attack_technique in config_option.get("related_attack_techniques", []): + # No config values could be a reason that related attack techniques are left + # unscanned. See https://github.com/guardicore/monkey/issues/1518 for more. + config_field = config_option["title"] + self._add_config_field_to_reverse_schema( + config_option_path, config_field, attack_technique + ) - # check for "properties" and each property's related techniques recursively; - # the levels of nesting and where related techniques are declared won't - # always be fixed in the config schema - _crawl_properties(config_option_path, new_config_option, reverse_schema) - - -def _check_related_attack_techniques( - config_option_path: str, config_option: Dict, reverse_schema: Dict -): - for attack_technique in config_option.get("related_attack_techniques", []): - # No config values could be a reason that related attack techniques are left - # unscanned. See https://github.com/guardicore/monkey/issues/1518 for more. - config_field = config_option["title"] - _add_config_field_to_reverse_schema( - config_option_path, config_field, attack_technique, reverse_schema - ) - - -def _add_config_field_to_reverse_schema( - definition_type: str, config_field: str, attack_technique: str, reverse_schema: Dict -) -> None: - reverse_schema.setdefault(attack_technique, {}) - reverse_schema[attack_technique].setdefault(definition_type, []) - reverse_schema[attack_technique][definition_type].append(config_field) + def _add_config_field_to_reverse_schema( + self, definition_type: str, config_field: str, attack_technique: str + ) -> None: + self.reverse_schema.setdefault(attack_technique, {}) + self.reverse_schema[attack_technique].setdefault(definition_type, []) + self.reverse_schema[attack_technique][definition_type].append(config_field)