diff --git a/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py b/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py index 34aa11691..8a02deb02 100644 --- a/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py +++ b/monkey/monkey_island/cc/services/attack/technique_reports/__init__.py @@ -6,20 +6,21 @@ from common.utils.attack_utils import ScanStatus from common.utils.code_utils import abstractstatic from monkey_island.cc.database import mongo from monkey_island.cc.models.attack.attack_mitigations import AttackMitigations -from monkey_island.cc.services.config_schema.config_schema import SCHEMA from monkey_island.cc.services.attack.attack_schema import SCHEMA as ATTACK_SCHEMA -from monkey_island.cc.services.config_schema.config_schema_per_attack_technique import ( - ConfigSchemaPerAttackTechnique, -) + logger = logging.getLogger(__name__) +UNSCANNED_MESSAGE = ( + "The configuration options corresponding to this ATT&CK technique were not " + "enabled in the configuration." +) + + class AttackTechnique(object, metaclass=abc.ABCMeta): """Abstract class for ATT&CK report components""" - config_schema_per_attack_technique = None - @property @abc.abstractmethod def unscanned_msg(self): @@ -120,52 +121,13 @@ class AttackTechnique(object, metaclass=abc.ABCMeta): :return: message string """ if status == ScanStatus.UNSCANNED.value: - if not cls.config_schema_per_attack_technique: - cls.config_schema_per_attack_technique = ( - ConfigSchemaPerAttackTechnique().get_config_schema_per_attack_technique(SCHEMA) - ) - unscanned_msg = cls._get_unscanned_msg_with_reasons( - cls.unscanned_msg, cls.config_schema_per_attack_technique - ) + unscanned_msg = UNSCANNED_MESSAGE return unscanned_msg elif status == ScanStatus.SCANNED.value: return cls.scanned_msg else: return cls.used_msg - @classmethod - def _get_unscanned_msg_with_reasons( - cls, unscanned_msg: str, config_schema_per_attack_technique: Dict - ): - reasons = [] - if len(cls.relevant_systems) == 1: - reasons.append(f"- Monkey did not run on any {cls.relevant_systems[0]} systems.") - if cls.tech_id in config_schema_per_attack_technique: - reasons.append( - "- The following configuration options were disabled or empty:
" - f"{cls._get_relevant_config_values(config_schema_per_attack_technique)}" - ) - - if reasons: - unscanned_msg = ( - unscanned_msg.strip(".") - + " due to one of the following reasons:\n" - + "\n".join(reasons) - ) - - return unscanned_msg - - @classmethod - def _get_relevant_config_values(cls, config_schema_per_attack_technique: Dict): - config_options = "" - for config_type in config_schema_per_attack_technique[cls.tech_id]: - config_options += ( - f"- {config_type} — " - f"{', '.join(config_schema_per_attack_technique[cls.tech_id][config_type])}
" - ) - - return config_options - @classmethod def technique_title(cls): """ diff --git a/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py b/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py deleted file mode 100644 index a760c8d00..000000000 --- a/monkey/monkey_island/cc/services/config_schema/config_schema_per_attack_technique.py +++ /dev/null @@ -1,83 +0,0 @@ -from typing import Dict, List - - -class ConfigSchemaPerAttackTechnique: - def __init__(self) -> None: - self.reverse_schema = {} - - def get_config_schema_per_attack_technique( - self, schema: Dict - ) -> Dict[str, Dict[str, List[str]]]: - """ - example: \ - { \ - "T1003": { \ - "System Info Collectors": [ \ - "Mimikatz collector", \ - ] \ - } \ - } \ - - :return: dictionary mapping each attack technique to relevant config \ - fields - """ - self._crawl_config_schema_definitions_for_reverse_schema(schema) - self._crawl_config_schema_properties_for_reverse_schema(schema) - - return self.reverse_schema - - def _crawl_config_schema_definitions_for_reverse_schema(self, schema: Dict): - definitions = schema["definitions"] - for definition in definitions: - definition_type = definitions[definition]["title"] - for field in definitions[definition].get("anyOf", []): - config_field = field["title"] - for attack_technique in field.get("attack_techniques", []): - self._add_config_field_to_reverse_schema( - definition_type, config_field, attack_technique - ) - - def _crawl_config_schema_properties_for_reverse_schema(self, schema: Dict): - properties = schema["properties"] - for prop in properties: - property_type = properties[prop]["title"] - for category_name in properties[prop].get("properties", []): - category = properties[prop]["properties"][category_name] - self._crawl_properties( - config_option_path=property_type, - config_option=category, - ) - - def _crawl_properties(self, config_option_path: str, config_option: Dict): - config_option_path = ( - f"{config_option_path} -> {config_option['title']}" - if "title" in config_option - else config_option_path - ) - for config_option_name in config_option.get("properties", []): - new_config_option = config_option["properties"][config_option_name] - self._check_related_attack_techniques( - config_option_path=config_option_path, - config_option=new_config_option, - ) - - # check for "properties" and each property's related techniques recursively; - # the levels of nesting and where related techniques are declared won't - # always be fixed in the config schema - self._crawl_properties(config_option_path, new_config_option) - - def _check_related_attack_techniques(self, config_option_path: str, config_option: Dict): - for attack_technique in config_option.get("related_attack_techniques", []): - # No config values could be a reason that related attack techniques are left - # unscanned. See https://github.com/guardicore/monkey/issues/1518 for more. - config_field = config_option["title"] - self._add_config_field_to_reverse_schema( - config_option_path, config_field, attack_technique - ) - - def _add_config_field_to_reverse_schema( - self, definition_type: str, config_field: str, attack_technique: str - ) -> None: - self.reverse_schema.setdefault(attack_technique, {}) - self.reverse_schema[attack_technique].setdefault(definition_type, []) - self.reverse_schema[attack_technique][definition_type].append(config_field)