island: Create class for reverse schema generation to avoid output arguments

This commit is contained in:
Shreya Malviya 2021-10-14 14:15:32 +05:30
parent 74095b6fc6
commit faa4c18cab
2 changed files with 71 additions and 76 deletions

View File

@ -9,7 +9,7 @@ from monkey_island.cc.models.attack.attack_mitigations import AttackMitigations
from monkey_island.cc.services.attack.attack_config import AttackConfig from monkey_island.cc.services.attack.attack_config import AttackConfig
from monkey_island.cc.services.config_schema.config_schema import SCHEMA from monkey_island.cc.services.config_schema.config_schema import SCHEMA
from monkey_island.cc.services.config_schema.config_schema_per_attack_technique import ( from monkey_island.cc.services.config_schema.config_schema_per_attack_technique import (
get_config_schema_per_attack_technique, ConfigSchemaPerAttackTechnique,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -122,8 +122,8 @@ class AttackTechnique(object, metaclass=abc.ABCMeta):
return disabled_msg return disabled_msg
if status == ScanStatus.UNSCANNED.value: if status == ScanStatus.UNSCANNED.value:
if not cls.config_schema_per_attack_technique: if not cls.config_schema_per_attack_technique:
cls.config_schema_per_attack_technique = get_config_schema_per_attack_technique( cls.config_schema_per_attack_technique = (
SCHEMA ConfigSchemaPerAttackTechnique().get_config_schema_per_attack_technique(SCHEMA)
) )
unscanned_msg = cls._get_unscanned_msg_with_reasons( unscanned_msg = cls._get_unscanned_msg_with_reasons(
cls.unscanned_msg, cls.config_schema_per_attack_technique cls.unscanned_msg, cls.config_schema_per_attack_technique

View File

@ -1,86 +1,81 @@
from typing import Dict, List from typing import Dict, List
def get_config_schema_per_attack_technique(schema: Dict) -> Dict[str, Dict[str, List[str]]]: class ConfigSchemaPerAttackTechnique:
""" def __init__(self) -> None:
:return: dictionary mapping each attack technique to relevant config fields; example - self.reverse_schema = {}
{
"T1003": { def get_config_schema_per_attack_technique(
"System Info Collectors": [ self, schema: Dict
"Mimikatz collector", ) -> Dict[str, Dict[str, List[str]]]:
"Azure credential collector" """
] :return: dictionary mapping each attack technique to relevant config fields; example -
{
"T1003": {
"System Info Collectors": [
"Mimikatz collector",
"Azure credential collector"
]
}
} }
} """
""" self._crawl_config_schema_definitions_for_reverse_schema(schema)
reverse_schema = {} self._crawl_config_schema_properties_for_reverse_schema(schema)
_crawl_config_schema_definitions_for_reverse_schema(schema, reverse_schema) return self.reverse_schema
_crawl_config_schema_properties_for_reverse_schema(schema, reverse_schema)
return reverse_schema def _crawl_config_schema_definitions_for_reverse_schema(self, schema: Dict):
definitions = schema["definitions"]
for definition in definitions:
definition_type = definitions[definition]["title"]
for field in definitions[definition].get("anyOf", []):
config_field = field["title"]
for attack_technique in field.get("attack_techniques", []):
self._add_config_field_to_reverse_schema(
definition_type, config_field, attack_technique
)
def _crawl_config_schema_properties_for_reverse_schema(self, schema: Dict):
def _crawl_config_schema_definitions_for_reverse_schema(schema: Dict, reverse_schema: Dict): properties = schema["properties"]
definitions = schema["definitions"] for prop in properties:
for definition in definitions: property_type = properties[prop]["title"]
definition_type = definitions[definition]["title"] for category_name in properties[prop].get("properties", []):
for field in definitions[definition].get("anyOf", []): category = properties[prop]["properties"][category_name]
config_field = field["title"] self._crawl_properties(
for attack_technique in field.get("attack_techniques", []): config_option_path=property_type,
_add_config_field_to_reverse_schema( config_option=category,
definition_type, config_field, attack_technique, reverse_schema
) )
def _crawl_properties(self, config_option_path: str, config_option: Dict):
def _crawl_config_schema_properties_for_reverse_schema(schema: Dict, reverse_schema: Dict): config_option_path = (
properties = schema["properties"] f"{config_option_path} -> {config_option['title']}"
for prop in properties: if "title" in config_option
property_type = properties[prop]["title"] else config_option_path
for category_name in properties[prop].get("properties", []): )
category = properties[prop]["properties"][category_name] for config_option_name in config_option.get("properties", []):
_crawl_properties( new_config_option = config_option["properties"][config_option_name]
config_option_path=property_type, self._check_related_attack_techniques(
config_option=category, config_option_path=config_option_path,
reverse_schema=reverse_schema, config_option=new_config_option,
) )
# check for "properties" and each property's related techniques recursively;
# the levels of nesting and where related techniques are declared won't
# always be fixed in the config schema
self._crawl_properties(config_option_path, new_config_option)
def _crawl_properties(config_option_path: str, config_option: Dict, reverse_schema: Dict): def _check_related_attack_techniques(self, config_option_path: str, config_option: Dict):
config_option_path = ( for attack_technique in config_option.get("related_attack_techniques", []):
f"{config_option_path} -> {config_option['title']}" # No config values could be a reason that related attack techniques are left
if "title" in config_option # unscanned. See https://github.com/guardicore/monkey/issues/1518 for more.
else config_option_path config_field = config_option["title"]
) self._add_config_field_to_reverse_schema(
for config_option_name in config_option.get("properties", []): config_option_path, config_field, attack_technique
new_config_option = config_option["properties"][config_option_name] )
_check_related_attack_techniques(
config_option_path=config_option_path,
config_option=new_config_option,
reverse_schema=reverse_schema,
)
# check for "properties" and each property's related techniques recursively; def _add_config_field_to_reverse_schema(
# the levels of nesting and where related techniques are declared won't self, definition_type: str, config_field: str, attack_technique: str
# always be fixed in the config schema ) -> None:
_crawl_properties(config_option_path, new_config_option, reverse_schema) self.reverse_schema.setdefault(attack_technique, {})
self.reverse_schema[attack_technique].setdefault(definition_type, [])
self.reverse_schema[attack_technique][definition_type].append(config_field)
def _check_related_attack_techniques(
config_option_path: str, config_option: Dict, reverse_schema: Dict
):
for attack_technique in config_option.get("related_attack_techniques", []):
# No config values could be a reason that related attack techniques are left
# unscanned. See https://github.com/guardicore/monkey/issues/1518 for more.
config_field = config_option["title"]
_add_config_field_to_reverse_schema(
config_option_path, config_field, attack_technique, reverse_schema
)
def _add_config_field_to_reverse_schema(
definition_type: str, config_field: str, attack_technique: str, reverse_schema: Dict
) -> None:
reverse_schema.setdefault(attack_technique, {})
reverse_schema[attack_technique].setdefault(definition_type, [])
reverse_schema[attack_technique][definition_type].append(config_field)