Merge pull request #2330 from guardicore/2260-fix-mypy-issues

2260 fix mypy issues
This commit is contained in:
Kekoa Kaaikala 2022-09-22 09:49:42 -04:00 committed by GitHub
commit 9c0ea39b6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 95 additions and 104 deletions

View File

@ -5,7 +5,7 @@ from typing import Optional, Union
from pydantic import SecretBytes, SecretStr from pydantic import SecretBytes, SecretStr
def get_plaintext(secret: Union[SecretStr, SecretBytes, None, str]) -> Optional[str]: def get_plaintext(secret: Union[SecretStr, SecretBytes, None, str]) -> Optional[Union[str, bytes]]:
if isinstance(secret, (SecretStr, SecretBytes)): if isinstance(secret, (SecretStr, SecretBytes)):
return secret.get_secret_value() return secret.get_secret_value()
else: else:

View File

@ -1,5 +1,5 @@
import logging import logging
from typing import Any, Iterable from typing import Any, Dict, Iterable, Sequence
from common.credentials import Credentials, LMHash, NTHash, Password, SSHKeypair, Username from common.credentials import Credentials, LMHash, NTHash, Password, SSHKeypair, Username
from common.credentials.credentials import Identity, Secret from common.credentials.credentials import Identity, Secret
@ -21,7 +21,7 @@ class AggregatingPropagationCredentialsRepository(IPropagationCredentialsReposit
""" """
def __init__(self, control_channel: IControlChannel): def __init__(self, control_channel: IControlChannel):
self._stored_credentials = { self._stored_credentials: Dict = {
"exploit_user_list": set(), "exploit_user_list": set(),
"exploit_password_list": set(), "exploit_password_list": set(),
"exploit_lm_hash_list": set(), "exploit_lm_hash_list": set(),
@ -72,7 +72,7 @@ class AggregatingPropagationCredentialsRepository(IPropagationCredentialsReposit
return self._stored_credentials return self._stored_credentials
def _set_attribute(self, attribute_to_be_set: str, credentials_values: Iterable[Any]): def _set_attribute(self, attribute_to_be_set: str, credentials_values: Sequence[Any]):
if not credentials_values: if not credentials_values:
return return

View File

@ -7,6 +7,7 @@ from typing import Dict, Sequence
from common.event_queue import IAgentEventQueue from common.event_queue import IAgentEventQueue
from common.utils.exceptions import FailedExploitationError from common.utils.exceptions import FailedExploitationError
from infection_monkey.i_puppet import ExploiterResultData from infection_monkey.i_puppet import ExploiterResultData
from infection_monkey.model import VictimHost
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
from . import IAgentBinaryRepository from . import IAgentBinaryRepository
@ -55,10 +56,9 @@ class HostExploiter:
} }
) )
# TODO: host should be VictimHost, at the moment it can't because of circular dependency
def exploit_host( def exploit_host(
self, self,
host, host: VictimHost,
servers: Sequence[str], servers: Sequence[str],
current_depth: int, current_depth: int,
telemetry_messenger: ITelemetryMessenger, telemetry_messenger: ITelemetryMessenger,

View File

@ -15,12 +15,13 @@ AGENT_BINARY_PATH_WIN64 = PureWindowsPath(r"C:\Windows\temp\monkey64.exe")
def get_agent_dst_path(host: VictimHost) -> PurePath: def get_agent_dst_path(host: VictimHost) -> PurePath:
if host.is_windows(): return _add_random_suffix(_get_agent_path(host))
path = PureWindowsPath(AGENT_BINARY_PATH_WIN64)
else:
path = PurePosixPath(AGENT_BINARY_PATH_LINUX)
return _add_random_suffix(path)
def _get_agent_path(host: VictimHost) -> PurePath:
if host.is_windows():
return PureWindowsPath(AGENT_BINARY_PATH_WIN64)
return PurePosixPath(AGENT_BINARY_PATH_LINUX)
def get_random_file_suffix() -> str: def get_random_file_suffix() -> str:

View File

@ -1,17 +1,18 @@
import logging import logging
from typing import Optional from typing import Optional, Tuple
import nmb.NetBIOS import nmb.NetBIOS
from impacket.dcerpc.v5 import nrpc, rpcrt from impacket.dcerpc.v5 import nrpc, rpcrt
from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT
from common.utils.exceptions import DomainControllerNameFetchError from common.utils.exceptions import DomainControllerNameFetchError
from infection_monkey.model import VictimHost
from infection_monkey.utils.threading import interruptible_iter from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_dc_details(host: object) -> (str, str, str): def get_dc_details(host: VictimHost) -> Tuple[str, str, str]:
dc_ip = host.ip_addr dc_ip = host.ip_addr
dc_name = _get_dc_name(dc_ip=dc_ip) dc_name = _get_dc_name(dc_ip=dc_ip)
dc_handle = "\\\\" + dc_name dc_handle = "\\\\" + dc_name
@ -35,7 +36,7 @@ def _get_dc_name(dc_ip: str) -> str:
) )
def is_exploitable(zerologon_exploiter_object) -> (bool, Optional[rpcrt.DCERPC_v5]): def is_exploitable(zerologon_exploiter_object) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
# Connect to the DC's Netlogon service. # Connect to the DC's Netlogon service.
try: try:
rpc_con = zerologon_exploiter_object.connect_to_dc(zerologon_exploiter_object.dc_ip) rpc_con = zerologon_exploiter_object.connect_to_dc(zerologon_exploiter_object.dc_ip)

View File

@ -113,7 +113,7 @@ class HTTPIslandAPIClient(IIslandAPIClient):
return response.content return response.content
@handle_island_errors @handle_island_errors
def get_agent_binary(self, operating_system: OperatingSystem): def get_agent_binary(self, operating_system: OperatingSystem) -> bytes:
os_name = operating_system.value os_name = operating_system.value
response = requests.get( # noqa: DUO123 response = requests.get( # noqa: DUO123
f"{self._api_url}/agent-binaries/{os_name}", f"{self._api_url}/agent-binaries/{os_name}",
@ -125,7 +125,7 @@ class HTTPIslandAPIClient(IIslandAPIClient):
return response.content return response.content
@handle_island_errors @handle_island_errors
def send_events(self, events: Sequence[JSONSerializable]): def send_events(self, events: Sequence[AbstractAgentEvent]):
response = requests.post( # noqa: DUO123 response = requests.post( # noqa: DUO123
f"{self._api_url}/agent-events", f"{self._api_url}/agent-events",
json=self._serialize_events(events), json=self._serialize_events(events),
@ -203,9 +203,9 @@ class HTTPIslandAPIClient(IIslandAPIClient):
class HTTPIslandAPIClientFactory(AbstractIslandAPIClientFactory): class HTTPIslandAPIClientFactory(AbstractIslandAPIClientFactory):
def __init__( def __init__(
self, self,
agent_event_serializer_registry: AgentEventSerializerRegistry = None, agent_event_serializer_registry: AgentEventSerializerRegistry,
): ):
self._agent_event_serializer_registry = agent_event_serializer_registry self._agent_event_serializer_registry = agent_event_serializer_registry
def create_island_api_client(self): def create_island_api_client(self) -> IIslandAPIClient:
return HTTPIslandAPIClient(self._agent_event_serializer_registry) return HTTPIslandAPIClient(self._agent_event_serializer_registry)

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, Sequence from typing import Sequence
from common import AgentRegistrationData, OperatingSystem from common import AgentRegistrationData, OperatingSystem
from common.agent_configuration import AgentConfiguration from common.agent_configuration import AgentConfiguration
@ -62,7 +62,7 @@ class IIslandAPIClient(ABC):
""" """
@abstractmethod @abstractmethod
def get_agent_binary(self, operating_system: OperatingSystem) -> Optional[bytes]: def get_agent_binary(self, operating_system: OperatingSystem) -> bytes:
""" """
Get an agent binary for the given OS from the island Get an agent binary for the given OS from the island

View File

@ -146,7 +146,7 @@ class InfectionMonkey:
return opts return opts
# TODO: By the time we finish 2292, _connect_to_island_api() may not need to return `server` # TODO: By the time we finish 2292, _connect_to_island_api() may not need to return `server`
def _connect_to_island_api(self) -> Tuple[str, IIslandAPIClient]: def _connect_to_island_api(self) -> Tuple[Optional[str], Optional[IIslandAPIClient]]:
logger.debug(f"Trying to wake up with servers: {', '.join(self._opts.servers)}") logger.debug(f"Trying to wake up with servers: {', '.join(self._opts.servers)}")
server_clients = find_available_island_apis( server_clients = find_available_island_apis(
self._opts.servers, HTTPIslandAPIClientFactory(self._agent_event_serializer_registry) self._opts.servers, HTTPIslandAPIClientFactory(self._agent_event_serializer_registry)
@ -182,7 +182,7 @@ class InfectionMonkey:
self._island_api_client.register_agent(agent_registration_data) self._island_api_client.register_agent(agent_registration_data)
def _select_server( def _select_server(
self, server_clients: Mapping[str, IIslandAPIClient] self, server_clients: Mapping[str, Optional[IIslandAPIClient]]
) -> Tuple[Optional[str], Optional[IIslandAPIClient]]: ) -> Tuple[Optional[str], Optional[IIslandAPIClient]]:
for server in self._opts.servers: for server in self._opts.servers:
if server_clients[server]: if server_clients[server]:

View File

@ -1,4 +1,6 @@
import logging import logging
from pathlib import Path
from typing import Optional
from common.utils.file_utils import InvalidPath, expand_path from common.utils.file_utils import InvalidPath, expand_path
from infection_monkey.utils.environment import is_windows_os from infection_monkey.utils.environment import is_windows_os
@ -12,7 +14,7 @@ class RansomwareOptions:
self.file_extension = options["encryption"]["file_extension"] self.file_extension = options["encryption"]["file_extension"]
self.readme_enabled = options["other_behaviors"]["readme"] self.readme_enabled = options["other_behaviors"]["readme"]
self.target_directory = None self.target_directory: Optional[Path] = None
self._set_target_directory(options["encryption"]["directories"]) self._set_target_directory(options["encryption"]["directories"])
def _set_target_directory(self, os_target_directories: dict): def _set_target_directory(self, os_target_directories: dict):

View File

@ -1,6 +1,6 @@
import logging import logging
import subprocess import subprocess
from typing import Dict, Iterable from typing import Dict, Iterable, List, Tuple
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -33,7 +33,7 @@ class PBA:
""" """
self.command = PBA.choose_command(linux_cmd, windows_cmd) self.command = PBA.choose_command(linux_cmd, windows_cmd)
self.name = name self.name = name
self.pba_data = [] self.pba_data: List[PostBreachData] = []
self.telemetry_messenger = telemetry_messenger self.telemetry_messenger = telemetry_messenger
self.timeout = timeout self.timeout = timeout
@ -73,7 +73,7 @@ class PBA:
pba_execution_succeeded = pba_execution_result[1] pba_execution_succeeded = pba_execution_result[1]
return pba_execution_succeeded and self.is_script() return pba_execution_succeeded and self.is_script()
def _execute_default(self): def _execute_default(self) -> Tuple[str, bool]:
""" """
Default post breach command execution routine Default post breach command execution routine
:return: Tuple of command's output string and boolean, indicating if it succeeded :return: Tuple of command's output string and boolean, indicating if it succeeded
@ -84,7 +84,7 @@ class PBA:
).decode() ).decode()
return output, True return output, True
except subprocess.CalledProcessError as err: except subprocess.CalledProcessError as err:
return err.output.decode(), False return bytes(err.output).decode(), False
except subprocess.TimeoutExpired as err: except subprocess.TimeoutExpired as err:
return str(err), False return str(err), False

View File

@ -1,9 +1,11 @@
from __future__ import annotations
import io import io
import sys import sys
class StdoutCapture: class StdoutCapture:
def __enter__(self) -> None: def __enter__(self) -> StdoutCapture:
self._orig_stdout = sys.stdout self._orig_stdout = sys.stdout
self._new_stdout = io.StringIO() self._new_stdout = io.StringIO()
sys.stdout = self._new_stdout sys.stdout = self._new_stdout

View File

@ -50,6 +50,7 @@ tqdm = "*" # Used in BB tests
sphinx = "*" # Used in documentation sphinx = "*" # Used in documentation
sphinx_rtd_theme = "*" sphinx_rtd_theme = "*"
sphinx_autodoc_typehints = "*" sphinx_autodoc_typehints = "*"
types-python-dateutil = "*"
[requires] [requires]
python_version = "3.7" python_version = "3.7"

View File

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "7a3d0acc52ca38402412bd299c9cef2f387b7abe8803a3a4c839cbe8e3091195" "sha256": "0cacab24c5242c8a27fa9ee3c7e91117f26234dd952d3e25bc7d5b1cf34cffff"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -847,7 +847,7 @@
"sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02", "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02",
"sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6" "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"
], ],
"markers": "python_version < '3.8'", "markers": "python_version >= '3.7'",
"version": "==4.3.0" "version": "==4.3.0"
}, },
"urllib3": { "urllib3": {
@ -1505,12 +1505,20 @@
"markers": "python_version < '3.8' and implementation_name == 'cpython'", "markers": "python_version < '3.8' and implementation_name == 'cpython'",
"version": "==1.5.4" "version": "==1.5.4"
}, },
"types-python-dateutil": {
"hashes": [
"sha256:6284df1e4783d8fc6e587f0317a81333856b872a6669a282f8a325342bce7fa8",
"sha256:bfd3eb39c7253aea4ba23b10f69b017d30b013662bb4be4ab48b20bbd763f309"
],
"index": "pypi",
"version": "==2.8.19"
},
"typing-extensions": { "typing-extensions": {
"hashes": [ "hashes": [
"sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02", "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02",
"sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6" "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"
], ],
"markers": "python_version < '3.8'", "markers": "python_version >= '3.7'",
"version": "==4.3.0" "version": "==4.3.0"
}, },
"urllib3": { "urllib3": {

View File

@ -2,7 +2,7 @@ import os
import re import re
import uuid import uuid
from datetime import timedelta from datetime import timedelta
from typing import Iterable, Type from typing import Iterable, Set, Type
import flask_restful import flask_restful
from flask import Flask, Response, send_from_directory from flask import Flask, Response, send_from_directory
@ -120,7 +120,7 @@ class FlaskDIWrapper:
def __init__(self, api: flask_restful.Api, container: DIContainer): def __init__(self, api: flask_restful.Api, container: DIContainer):
self._api = api self._api = api
self._container = container self._container = container
self._reserved_urls = set() self._reserved_urls: Set[str] = set()
def add_resource(self, resource: Type[AbstractResource]): def add_resource(self, resource: Type[AbstractResource]):
if len(resource.urls) == 0: if len(resource.urls) == 0:

View File

@ -1,17 +0,0 @@
from dataclasses import dataclass
from typing import Mapping, Sequence
# This is the most concise way to represent a graph:
# Machine id as key, Arch list as a value
# Not sure how compatible this will be with ORM objects though,
# might require more complex casting logic
@dataclass
class NetworkMap:
nodes: Mapping[str, Sequence[Arc]] # noqa: F821
@dataclass
class Arc:
dst_machine: Machine # noqa: F821
status: str

View File

@ -1,14 +0,0 @@
from abc import ABC
from typing import Optional, Sequence
class ILogRepository(ABC):
# Define log object
def get_logs(self, agent_id: Optional[str] = None) -> Sequence[Log]: # noqa: F821
pass
def save_log(self, log: Log): # noqa: F821
pass
def delete_log(self, agent_id: str):
pass

View File

@ -1,6 +1,8 @@
from typing import List
import flask_restful import flask_restful
# The purpose of this class is to decouple resources from flask # The purpose of this class is to decouple resources from flask
class AbstractResource(flask_restful.Resource): class AbstractResource(flask_restful.Resource):
urls = [] urls: List[str] = []

View File

@ -1,6 +1,6 @@
import logging import logging
from monkey_island.cc import Version from monkey_island.cc import Version as IslandVersion
from monkey_island.cc.resources.AbstractResource import AbstractResource from monkey_island.cc.resources.AbstractResource import AbstractResource
from monkey_island.cc.resources.request_authentication import jwt_required from monkey_island.cc.resources.request_authentication import jwt_required
@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
class Version(AbstractResource): class Version(AbstractResource):
urls = ["/api/island/version"] urls = ["/api/island/version"]
def __init__(self, version: Version): def __init__(self, version: IslandVersion):
self._version = version self._version = version
@jwt_required @jwt_required

View File

@ -4,7 +4,7 @@ import logging
import sys import sys
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
from typing import Sequence, Tuple from typing import Optional, Sequence, Tuple
import gevent.hub import gevent.hub
import requests import requests
@ -151,7 +151,7 @@ def _start_mongodb(data_dir: Path) -> MongoDbProcess:
return mongo_db_process return mongo_db_process
def _connect_to_mongodb(mongo_db_process: MongoDbProcess): def _connect_to_mongodb(mongo_db_process: Optional[MongoDbProcess]):
try: try:
mongo_setup.connect_to_mongodb(MONGO_CONNECTION_TIMEOUT) mongo_setup.connect_to_mongodb(MONGO_CONNECTION_TIMEOUT)
except mongo_setup.MongoDBTimeOutError as err: except mongo_setup.MongoDBTimeOutError as err:

View File

@ -1,7 +1,7 @@
import os import os
import secrets import secrets
from pathlib import Path from pathlib import Path
from typing import Union from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
@ -12,7 +12,7 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
_KEY_FILE_NAME = "mongo_key.bin" _KEY_FILE_NAME = "mongo_key.bin"
_encryptor: Union[None, IEncryptor] = None _encryptor: Optional[IEncryptor] = None
# NOTE: This class is being replaced by RepositoryEncryptor # NOTE: This class is being replaced by RepositoryEncryptor
@ -73,5 +73,5 @@ def _initialize_datastore_encryptor(key_file: Path, secret: str):
_encryptor = DataStoreEncryptor(secret, key_file) _encryptor = DataStoreEncryptor(secret, key_file)
def get_datastore_encryptor() -> IEncryptor: def get_datastore_encryptor() -> Optional[IEncryptor]:
return _encryptor return _encryptor

View File

@ -1,10 +1,11 @@
import secrets import secrets
from pathlib import Path from pathlib import Path
from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
from . import ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError from . import IEncryptor, ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from .key_based_encryptor import KeyBasedEncryptor from .key_based_encryptor import KeyBasedEncryptor
from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
@ -12,33 +13,32 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
class RepositoryEncryptor(ILockableEncryptor): class RepositoryEncryptor(ILockableEncryptor):
def __init__(self, key_file: Path): def __init__(self, key_file: Path):
self._key_file = key_file self._key_file = key_file
self._password_based_encryptor = None self._key_based_encryptor: Optional[IEncryptor] = None
self._key_based_encryptor = None
def unlock(self, secret: bytes): def unlock(self, secret: bytes):
try: try:
self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode()) encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor() self._key_based_encryptor = self._initialize_key_based_encryptor(encryptor)
except Exception as err: except Exception as err:
raise UnlockError(err) raise UnlockError(err)
def _initialize_key_based_encryptor(self): def _initialize_key_based_encryptor(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
if self._key_file.is_file(): if self._key_file.is_file():
return self._load_key() return self._load_key(encryptor)
return self._create_key() return self._create_key(encryptor)
def _load_key(self) -> KeyBasedEncryptor: def _load_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
with open(self._key_file, "rb") as f: with open(self._key_file, "rb") as f:
encrypted_key = f.read() encrypted_key = f.read()
plaintext_key = EncryptionKey32Bytes(self._password_based_encryptor.decrypt(encrypted_key)) plaintext_key = EncryptionKey32Bytes(encryptor.decrypt(encrypted_key))
return KeyBasedEncryptor(plaintext_key) return KeyBasedEncryptor(plaintext_key)
def _create_key(self) -> KeyBasedEncryptor: def _create_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32)) plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32))
encrypted_key = self._password_based_encryptor.encrypt(plaintext_key) encrypted_key = encryptor.encrypt(plaintext_key)
with open_new_securely_permissioned_file(str(self._key_file), "wb") as f: with open_new_securely_permissioned_file(str(self._key_file), "wb") as f:
f.write(encrypted_key) f.write(encrypted_key)
@ -54,7 +54,6 @@ class RepositoryEncryptor(ILockableEncryptor):
except Exception as err: except Exception as err:
raise ResetKeyError(err) raise ResetKeyError(err)
self._password_based_encryptor = None
self._key_based_encryptor = None self._key_based_encryptor = None
def encrypt(self, plaintext: bytes) -> bytes: def encrypt(self, plaintext: bytes) -> bytes:

View File

@ -1,4 +1,4 @@
from typing import Sequence from typing import Iterable
from common.network.network_utils import address_to_ip_port from common.network.network_utils import address_to_ip_port
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -17,12 +17,11 @@ class T1065(AttackTechnique):
@staticmethod @staticmethod
def get_report_data(): def get_report_data():
tunneling_ports = T1065.get_tunnel_ports() non_standard_ports = [*T1065.get_tunnel_ports(), str(ISLAND_PORT)]
non_standard_ports = [*tunneling_ports, str(ISLAND_PORT)]
T1065.used_msg = T1065.message % ", ".join(non_standard_ports) T1065.used_msg = T1065.message % ", ".join(non_standard_ports)
return T1065.get_base_data_by_status(ScanStatus.USED.value) return T1065.get_base_data_by_status(ScanStatus.USED.value)
@staticmethod @staticmethod
def get_tunnel_ports() -> Sequence[str]: def get_tunnel_ports() -> Iterable[str]:
telems = Telemetry.objects(telem_category="tunnel", data__proxy__ne=None) telems = Telemetry.objects(telem_category="tunnel", data__proxy__ne=None)
return [address_to_ip_port(telem["data"]["proxy"])[1] for telem in telems] return filter(None, [address_to_ip_port(telem["data"]["proxy"])[1] for telem in telems])

View File

@ -18,8 +18,8 @@ def get_propagation_stats() -> Dict:
} }
def _get_exploit_counts(exploited: List[MonkeyExploitation]) -> Dict: def _get_exploit_counts(exploited: List[MonkeyExploitation]) -> Dict[str, int]:
exploit_counts = {} exploit_counts: Dict[str, int] = {}
for node in exploited: for node in exploited:
for exploit in node.exploits: for exploit in node.exploits:

View File

@ -15,7 +15,7 @@ class ExploiterReportInfo:
ip_address: str ip_address: str
type: str type: str
username: Union[str, None] = None username: Union[str, None] = None
credential_type: Union[CredentialType, None] = None credential_type: Union[str, None] = None
ssh_key: Union[str, None] = None ssh_key: Union[str, None] = None
password: Union[str, None] = None password: Union[str, None] = None
port: Union[str, None] = None port: Union[str, None] = None

View File

@ -67,6 +67,8 @@ def is_segmentation_violation(
return cross_segment_ip is not None return cross_segment_ip is not None
return False
def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet): def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet):
return Event.create_event( return Event.create_event(

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Union from typing import Dict, List, Union, cast
from bson import SON from bson import SON
@ -30,21 +30,26 @@ class FindingService:
@staticmethod @staticmethod
def get_all_findings_for_ui() -> List[EnrichedFinding]: def get_all_findings_for_ui() -> List[EnrichedFinding]:
findings = FindingService.get_all_findings_from_db() findings = FindingService.get_all_findings_from_db()
for i in range(len(findings)): enriched_findings: List[EnrichedFinding] = []
details = FindingService._get_finding_details(findings[i]) for finding in findings:
findings[i] = findings[i].to_mongo() finding_data = finding.to_mongo()
findings[i] = FindingService._get_enriched_finding(findings[i]) enriched_finding = FindingService._get_enriched_finding(finding_data)
findings[i].details = details details = FindingService._get_finding_details(finding)
return findings enriched_finding.details = details
enriched_findings.append(enriched_finding)
return enriched_findings
@staticmethod @staticmethod
def _get_enriched_finding(finding: Finding) -> EnrichedFinding: def _get_enriched_finding(finding: SON) -> EnrichedFinding:
test_info = zero_trust_consts.TESTS_MAP[finding["test"]] test_info = zero_trust_consts.TESTS_MAP[finding["test"]]
enriched_finding = EnrichedFinding( enriched_finding = EnrichedFinding(
finding_id=str(finding["_id"]), finding_id=str(finding["_id"]),
test=test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY][finding["status"]], test=cast(
Dict[str, str], test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY]
)[finding["status"]],
test_key=finding["test"], test_key=finding["test"],
pillars=test_info[zero_trust_consts.PILLARS_KEY], pillars=cast(List[str], test_info[zero_trust_consts.PILLARS_KEY]),
status=finding["status"], status=finding["status"],
details=None, details=None,
) )

View File

@ -49,7 +49,7 @@ class MongoDbProcess:
self._process.kill() self._process.kill()
def is_running(self) -> bool: def is_running(self) -> bool:
if self._process.poll() is None: if self._process and self._process.poll() is None:
return True return True
return False return False