Compare commits

...

19 Commits

Author SHA1 Message Date
Kekoa Kaaikala e40d061091 Agent: Fix PortScanData mypy issues 2022-09-21 20:30:41 +00:00
Kekoa Kaaikala d4f6c83f56 Agent: Fix VictimHost mypy issues 2022-09-21 20:19:20 +00:00
Kekoa Kaaikala d440d51f53 Island: Fix mypy issues in segmentation.py 2022-09-21 18:29:34 +00:00
Kekoa Kaaikala 218b341006 Island: Fix mypy issues in networkmap.py 2022-09-21 18:28:58 +00:00
Kekoa Kaaikala 24848e62df Island: Fix mypy issues in version.py 2022-09-21 18:28:17 +00:00
Kekoa Kaaikala a9e101fd04 Island: Fix mypy issues in T1065.py 2022-09-21 18:25:38 +00:00
Kekoa Kaaikala 1a6f48614e Island: Fix mypy issues in mongo_db_process.py 2022-09-21 18:24:47 +00:00
Kekoa Kaaikala a5b5449f73 Island: Fix mypy issues in finding_service.py 2022-09-21 18:23:33 +00:00
Kekoa Kaaikala 7013963d59 Island: Fix mypy issues in cred_exploit.py 2022-09-21 18:22:20 +00:00
Kekoa Kaaikala ed5773878e Island: Fix mypy issues in ransomware_report.py 2022-09-21 18:21:42 +00:00
Kekoa Kaaikala c870fde3cc Island: Fix mypy issues in exploit.py 2022-09-21 18:21:03 +00:00
Kekoa Kaaikala e595a70019 Island: Fix mypy issues for encryptors 2022-09-21 18:19:58 +00:00
Kekoa Kaaikala e8aa231f92 Island: Fix mypy issues in AbstractResource.py 2022-09-21 18:09:18 +00:00
Kekoa Kaaikala c0b2981150 Island: Fix mypy issues in i_log_repository.py 2022-09-21 18:08:25 +00:00
Kekoa Kaaikala 7801a98a15 Island: Fix mypy issues in server_setup.py 2022-09-21 18:06:53 +00:00
Kekoa Kaaikala 6209ec47cd Island: Fix mypy issues in app.py 2022-09-21 18:06:19 +00:00
Kekoa Kaaikala 795d9fe201 Agent: Fix mypy issues in ransomware_options.py 2022-09-21 17:58:41 +00:00
Kekoa Kaaikala ea67c07d70 Agent: Fix mypy issues in pba.py 2022-09-21 17:57:44 +00:00
Kekoa Kaaikala 70c74b87a9 Agent: Fix mypy issues in capture_output.py 2022-09-21 17:55:41 +00:00
28 changed files with 135 additions and 91 deletions

View File

@ -1,10 +1,10 @@
import abc
import threading
from collections import namedtuple
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Mapping, Sequence
from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
from common import OperatingSystem
from common.credentials import Credentials
from infection_monkey.model import VictimHost
@ -26,15 +26,37 @@ class ExploiterResultData:
propagation_success: bool = False
interrupted: bool = False
os: str = ""
info: Mapping = None
attempts: Iterable = None
info: Mapping = field(default_factory=lambda: {})
attempts: Iterable = field(default_factory=lambda: [])
error_message: str = ""
PingScanData = namedtuple("PingScanData", ["response_received", "os"])
PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"])
FingerprintData = namedtuple("FingerprintData", ["os_type", "os_version", "services"])
PostBreachData = namedtuple("PostBreachData", ["display_name", "command", "result"])
@dataclass(frozen=True)
class FingerprintData:
os_type: Optional[OperatingSystem]
os_version: str
services: Mapping = field(default_factory=lambda: {})
@dataclass(frozen=True)
class PingScanData:
response_received: bool
os: Optional[OperatingSystem]
@dataclass(frozen=True)
class PortScanData:
port: int
status: PortStatus
banner: str
service: str
@dataclass(frozen=True)
class PostBreachData:
display_name: str
command: str
result: Tuple[str, bool]
class IPuppet(metaclass=abc.ABCMeta):
@ -84,7 +106,7 @@ class IPuppet(metaclass=abc.ABCMeta):
@abc.abstractmethod
def scan_tcp_ports(
self, host: str, ports: List[int], timeout: float = 3
) -> Dict[int, PortScanData]:
) -> Mapping[int, PortScanData]:
"""
Scans a list of TCP ports on a remote host
@ -92,7 +114,7 @@ class IPuppet(metaclass=abc.ABCMeta):
:param int ports: List of TCP port numbers to scan
:param float timeout: The maximum amount of time (in seconds) to wait for a response
:return: The data collected by scanning the provided host:ports combination
:rtype: Dict[int, PortScanData]
:rtype: Mapping[int, PortScanData]
"""
@abc.abstractmethod
@ -125,6 +147,7 @@ class IPuppet(metaclass=abc.ABCMeta):
name: str,
host: VictimHost,
current_depth: int,
servers: Sequence[str],
options: Dict,
interrupt: threading.Event,
) -> ExploiterResultData:

View File

@ -10,7 +10,7 @@ class VictimHost(object):
self.os: Dict[str, Any] = {}
self.services: Dict[str, Any] = {}
self.icmp = False
self.default_server = None
self.default_server = ""
def as_dict(self):
return self.__dict__

View File

@ -12,7 +12,7 @@ from infection_monkey.network.tools import BANNER_READ, DEFAULT_TIMEOUT, tcp_por
logger = logging.getLogger(__name__)
POLL_INTERVAL = 0.5
EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, None, None)}
EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, "", "")}
def scan_tcp_ports(
@ -48,7 +48,7 @@ def _build_port_scan_data(
def _get_closed_port_data(port: int) -> PortScanData:
return PortScanData(port, PortStatus.CLOSED, None, None)
return PortScanData(port, PortStatus.CLOSED, "", "")
def _check_tcp_ports(

View File

@ -1,4 +1,6 @@
import logging
from pathlib import Path
from typing import Optional
from common.utils.file_utils import InvalidPath, expand_path
from infection_monkey.utils.environment import is_windows_os
@ -12,7 +14,7 @@ class RansomwareOptions:
self.file_extension = options["encryption"]["file_extension"]
self.readme_enabled = options["other_behaviors"]["readme"]
self.target_directory = None
self.target_directory: Optional[Path] = None
self._set_target_directory(options["encryption"]["directories"])
def _set_target_directory(self, os_target_directories: dict):

View File

@ -1,6 +1,6 @@
import logging
import subprocess
from typing import Dict, Iterable
from typing import Dict, Iterable, List, Tuple
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.utils.attack_utils import ScanStatus
@ -33,7 +33,7 @@ class PBA:
"""
self.command = PBA.choose_command(linux_cmd, windows_cmd)
self.name = name
self.pba_data = []
self.pba_data: List[PostBreachData] = []
self.telemetry_messenger = telemetry_messenger
self.timeout = timeout
@ -73,7 +73,7 @@ class PBA:
pba_execution_succeeded = pba_execution_result[1]
return pba_execution_succeeded and self.is_script()
def _execute_default(self):
def _execute_default(self) -> Tuple[str, bool]:
"""
Default post breach command execution routine
:return: Tuple of command's output string and boolean, indicating if it succeeded
@ -84,7 +84,7 @@ class PBA:
).decode()
return output, True
except subprocess.CalledProcessError as err:
return err.output.decode(), False
return bytes(err.output).decode(), False
except subprocess.TimeoutExpired as err:
return str(err), False

View File

@ -7,7 +7,6 @@ class StdoutCapture:
self._orig_stdout = sys.stdout
self._new_stdout = io.StringIO()
sys.stdout = self._new_stdout
return self
def get_captured_stdout_output(self) -> str:
self._new_stdout.seek(0)

View File

@ -2,7 +2,7 @@ import os
import re
import uuid
from datetime import timedelta
from typing import Iterable, Type
from typing import Iterable, Set, Type
import flask_restful
from flask import Flask, Response, send_from_directory
@ -120,7 +120,7 @@ class FlaskDIWrapper:
def __init__(self, api: flask_restful.Api, container: DIContainer):
self._api = api
self._container = container
self._reserved_urls = set()
self._reserved_urls: Set[str] = set()
def add_resource(self, resource: Type[AbstractResource]):
if len(resource.urls) == 0:

View File

@ -1,6 +1,14 @@
from dataclasses import dataclass
from typing import Mapping, Sequence
from monkey_island.cc.models import Machine
@dataclass
class Arc:
dst_machine: Machine # noqa: F821
status: str
# This is the most concise way to represent a graph:
# Machine id as key, Arch list as a value
@ -9,9 +17,3 @@ from typing import Mapping, Sequence
@dataclass
class NetworkMap:
nodes: Mapping[str, Sequence[Arc]] # noqa: F821
@dataclass
class Arc:
dst_machine: Machine # noqa: F821
status: str

View File

@ -2,8 +2,12 @@ from abc import ABC
from typing import Optional, Sequence
# TODO: Actually define the Log class
class Log:
pass
class ILogRepository(ABC):
# Define log object
def get_logs(self, agent_id: Optional[str] = None) -> Sequence[Log]: # noqa: F821
pass

View File

@ -1,6 +1,8 @@
from typing import List
import flask_restful
# The purpose of this class is to decouple resources from flask
class AbstractResource(flask_restful.Resource):
urls = []
urls: List[str] = []

View File

@ -1,6 +1,6 @@
import logging
from monkey_island.cc import Version
from monkey_island.cc import Version as IslandVersion
from monkey_island.cc.resources.AbstractResource import AbstractResource
from monkey_island.cc.resources.request_authentication import jwt_required
@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
class Version(AbstractResource):
urls = ["/api/island/version"]
def __init__(self, version: Version):
def __init__(self, version: IslandVersion):
self._version = version
@jwt_required

View File

@ -4,7 +4,7 @@ import logging
import sys
from pathlib import Path
from threading import Thread
from typing import Sequence, Tuple
from typing import Optional, Sequence, Tuple
import gevent.hub
import requests
@ -151,7 +151,7 @@ def _start_mongodb(data_dir: Path) -> MongoDbProcess:
return mongo_db_process
def _connect_to_mongodb(mongo_db_process: MongoDbProcess):
def _connect_to_mongodb(mongo_db_process: Optional[MongoDbProcess]):
try:
mongo_setup.connect_to_mongodb(MONGO_CONNECTION_TIMEOUT)
except mongo_setup.MongoDBTimeOutError as err:

View File

@ -1,7 +1,7 @@
import os
import secrets
from pathlib import Path
from typing import Union
from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
@ -12,7 +12,7 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
_KEY_FILE_NAME = "mongo_key.bin"
_encryptor: Union[None, IEncryptor] = None
_encryptor: Optional[IEncryptor] = None
# NOTE: This class is being replaced by RepositoryEncryptor
@ -73,5 +73,5 @@ def _initialize_datastore_encryptor(key_file: Path, secret: str):
_encryptor = DataStoreEncryptor(secret, key_file)
def get_datastore_encryptor() -> IEncryptor:
def get_datastore_encryptor() -> Optional[IEncryptor]:
return _encryptor

View File

@ -1,10 +1,11 @@
import secrets
from pathlib import Path
from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
from . import ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from . import IEncryptor, ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from .key_based_encryptor import KeyBasedEncryptor
from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
@ -12,33 +13,32 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
class RepositoryEncryptor(ILockableEncryptor):
def __init__(self, key_file: Path):
self._key_file = key_file
self._password_based_encryptor = None
self._key_based_encryptor = None
self._key_based_encryptor: Optional[IEncryptor] = None
def unlock(self, secret: bytes):
try:
self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor()
encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor(encryptor)
except Exception as err:
raise UnlockError(err)
def _initialize_key_based_encryptor(self):
def _initialize_key_based_encryptor(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
if self._key_file.is_file():
return self._load_key()
return self._load_key(encryptor)
return self._create_key()
return self._create_key(encryptor)
def _load_key(self) -> KeyBasedEncryptor:
def _load_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
with open(self._key_file, "rb") as f:
encrypted_key = f.read()
plaintext_key = EncryptionKey32Bytes(self._password_based_encryptor.decrypt(encrypted_key))
plaintext_key = EncryptionKey32Bytes(encryptor.decrypt(encrypted_key))
return KeyBasedEncryptor(plaintext_key)
def _create_key(self) -> KeyBasedEncryptor:
def _create_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32))
encrypted_key = self._password_based_encryptor.encrypt(plaintext_key)
encrypted_key = encryptor.encrypt(plaintext_key)
with open_new_securely_permissioned_file(str(self._key_file), "wb") as f:
f.write(encrypted_key)
@ -54,7 +54,6 @@ class RepositoryEncryptor(ILockableEncryptor):
except Exception as err:
raise ResetKeyError(err)
self._password_based_encryptor = None
self._key_based_encryptor = None
def encrypt(self, plaintext: bytes) -> bytes:

View File

@ -25,4 +25,8 @@ class T1065(AttackTechnique):
@staticmethod
def get_tunnel_ports() -> Sequence[str]:
telems = Telemetry.objects(telem_category="tunnel", data__proxy__ne=None)
return [address_to_ip_port(telem["data"]["proxy"])[1] for telem in telems]
return [
p
for p in (address_to_ip_port(telem["data"]["proxy"])[1] for telem in telems)
if p is not None
]

View File

@ -18,8 +18,8 @@ def get_propagation_stats() -> Dict:
}
def _get_exploit_counts(exploited: List[MonkeyExploitation]) -> Dict:
exploit_counts = {}
def _get_exploit_counts(exploited: List[MonkeyExploitation]) -> Dict[str, int]:
exploit_counts: Dict[str, int] = {}
for node in exploited:
for exploit in node.exploits:

View File

@ -16,12 +16,12 @@ class CredExploitProcessor:
if attempt["result"]:
exploit_info.username = attempt["user"]
if attempt["password"]:
exploit_info.credential_type = CredentialType.PASSWORD.value
exploit_info.credential_type = CredentialType.PASSWORD
exploit_info.password = attempt["password"]
elif attempt["ssh_key"]:
exploit_info.credential_type = CredentialType.KEY.value
exploit_info.credential_type = CredentialType.KEY
exploit_info.ssh_key = attempt["ssh_key"]
else:
exploit_info.credential_type = CredentialType.HASH.value
exploit_info.credential_type = CredentialType.HASH
return exploit_info
return exploit_info

View File

@ -1,6 +1,6 @@
import copy
import dateutil
from dateutil import parser as dateutil_parser
from monkey_island.cc.models import Monkey
from monkey_island.cc.server_utils.encryption import get_datastore_encryptor
@ -29,10 +29,10 @@ def process_exploit_telemetry(telemetry_json, _):
def update_network_with_exploit(edge: EdgeService, telemetry_json):
telemetry_json["data"]["info"]["started"] = dateutil.parser.parse(
telemetry_json["data"]["info"]["started"] = dateutil_parser.parse(
telemetry_json["data"]["info"]["started"]
)
telemetry_json["data"]["info"]["finished"] = dateutil.parser.parse(
telemetry_json["data"]["info"]["finished"] = dateutil_parser.parse(
telemetry_json["data"]["info"]["finished"]
)
new_exploit = copy.deepcopy(telemetry_json["data"])

View File

@ -67,6 +67,8 @@ def is_segmentation_violation(
return cross_segment_ip is not None
return False
def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet):
return Event.create_event(

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Union
from typing import Dict, List, Union, cast
from bson import SON
@ -30,21 +30,26 @@ class FindingService:
@staticmethod
def get_all_findings_for_ui() -> List[EnrichedFinding]:
findings = FindingService.get_all_findings_from_db()
for i in range(len(findings)):
details = FindingService._get_finding_details(findings[i])
findings[i] = findings[i].to_mongo()
findings[i] = FindingService._get_enriched_finding(findings[i])
findings[i].details = details
return findings
enriched_findings: List[EnrichedFinding] = []
for finding in findings:
finding_data = finding.to_mongo()
enriched_finding = FindingService._get_enriched_finding(finding_data)
details = FindingService._get_finding_details(finding)
enriched_finding.details = details
enriched_findings.append(enriched_finding)
return enriched_findings
@staticmethod
def _get_enriched_finding(finding: Finding) -> EnrichedFinding:
def _get_enriched_finding(finding: SON) -> EnrichedFinding:
test_info = zero_trust_consts.TESTS_MAP[finding["test"]]
enriched_finding = EnrichedFinding(
finding_id=str(finding["_id"]),
test=test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY][finding["status"]],
test=cast(
Dict[str, str], test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY]
)[finding["status"]],
test_key=finding["test"],
pillars=test_info[zero_trust_consts.PILLARS_KEY],
pillars=cast(List[str], test_info[zero_trust_consts.PILLARS_KEY]),
status=finding["status"],
details=None,
)

View File

@ -49,7 +49,7 @@ class MongoDbProcess:
self._process.kill()
def is_running(self) -> bool:
if self._process.poll() is None:
if self._process and self._process.poll() is None:
return True
return False

View File

@ -77,14 +77,14 @@ class MockPuppet(IPuppet):
) -> Dict[int, PortScanData]:
logger.debug(f"run_scan_tcp_port({host}, {ports}, {timeout})")
dot_1_results = {
22: PortScanData(22, PortStatus.CLOSED, None, None),
22: PortScanData(22, PortStatus.CLOSED, "", ""),
445: PortScanData(445, PortStatus.OPEN, "SMB BANNER", "tcp-445"),
3389: PortScanData(3389, PortStatus.OPEN, "", "tcp-3389"),
}
dot_3_results = {
22: PortScanData(22, PortStatus.OPEN, "SSH BANNER", "tcp-22"),
443: PortScanData(443, PortStatus.OPEN, "HTTPS BANNER", "tcp-443"),
3389: PortScanData(3389, PortStatus.CLOSED, "", None),
3389: PortScanData(3389, PortStatus.CLOSED, "", ""),
}
if host == DOT_1:
@ -104,7 +104,7 @@ class MockPuppet(IPuppet):
options: Dict,
) -> FingerprintData:
logger.debug(f"fingerprint({name}, {host})")
empty_fingerprint_data = FingerprintData(None, None, {})
empty_fingerprint_data = FingerprintData(None, "", {})
dot_1_results = {
"SMBFinger": FingerprintData(
@ -118,7 +118,7 @@ class MockPuppet(IPuppet):
),
"HTTPFinger": FingerprintData(
None,
None,
"",
{
"tcp-80": {"name": "http", "data": ("SERVER_HEADERS", False)},
"tcp-443": {"name": "http", "data": ("SERVER_HEADERS_2", True)},
@ -248,4 +248,4 @@ class MockPuppet(IPuppet):
def _get_empty_results(port: int):
return PortScanData(port, PortStatus.CLOSED, None, None)
return PortScanData(port, PortStatus.CLOSED, "", "")

View File

@ -35,12 +35,12 @@ def mock_victim_host_factory():
return MockVictimHostFactory()
empty_fingerprint_data = FingerprintData(None, None, {})
empty_fingerprint_data = FingerprintData(None, "", {})
dot_1_scan_results = IPScanResults(
PingScanData(True, "windows"),
{
22: PortScanData(22, PortStatus.CLOSED, None, None),
22: PortScanData(22, PortStatus.CLOSED, "", ""),
445: PortScanData(445, PortStatus.OPEN, "SMB BANNER", "tcp-445"),
3389: PortScanData(3389, PortStatus.OPEN, "", "tcp-3389"),
},
@ -56,7 +56,7 @@ dot_3_scan_results = IPScanResults(
{
22: PortScanData(22, PortStatus.OPEN, "SSH BANNER", "tcp-22"),
443: PortScanData(443, PortStatus.OPEN, "HTTPS BANNER", "tcp-443"),
3389: PortScanData(3389, PortStatus.CLOSED, "", None),
3389: PortScanData(3389, PortStatus.CLOSED, "", ""),
},
{
"SSHFinger": FingerprintData(
@ -64,7 +64,7 @@ dot_3_scan_results = IPScanResults(
),
"HTTPFinger": FingerprintData(
None,
None,
"",
{
"tcp-80": {"name": "http", "data": ("SERVER_HEADERS", False)},
"tcp-443": {"name": "http", "data": ("SERVER_HEADERS_2", True)},
@ -77,9 +77,9 @@ dot_3_scan_results = IPScanResults(
dead_host_scan_results = IPScanResults(
PingScanData(False, None),
{
22: PortScanData(22, PortStatus.CLOSED, None, None),
443: PortScanData(443, PortStatus.CLOSED, None, None),
3389: PortScanData(3389, PortStatus.CLOSED, "", None),
22: PortScanData(22, PortStatus.CLOSED, "", ""),
443: PortScanData(443, PortStatus.CLOSED, "", ""),
3389: PortScanData(3389, PortStatus.CLOSED, "", ""),
},
{},
)

View File

@ -13,7 +13,7 @@ def mock_get_interface_to_target(monkeypatch):
def test_factory_no_tunnel():
factory = VictimHostFactory(island_ip="192.168.56.1", island_port="5000", on_island=False)
network_address = NetworkAddress("192.168.56.2", None)
network_address = NetworkAddress("192.168.56.2", "")
victim = factory.build_victim_host(network_address)
@ -49,4 +49,4 @@ def test_factory_no_default_server():
victim = factory.build_victim_host(network_address)
assert victim.default_server is None
assert not victim.default_server

View File

@ -34,7 +34,7 @@ def test_tcp_successful(monkeypatch, patch_check_tcp_ports, open_ports_data):
for port in closed_ports:
assert port_scan_data[port].port == port
assert port_scan_data[port].status == PortStatus.CLOSED
assert port_scan_data[port].banner is None
assert not port_scan_data[port].banner
@pytest.mark.parametrize("open_ports_data", [{}])

View File

@ -1,11 +1,12 @@
import json
from typing import Iterable
import pytest
from infection_monkey.exploit.sshexec import SSHExploiter
from infection_monkey.i_puppet.i_puppet import ExploiterResultData
from infection_monkey.model.host import VictimHost
from infection_monkey.telemetry.exploit_telem import ExploitTelem
from monkey.infection_monkey.i_puppet.i_puppet import ExploiterResultData
DOMAIN_NAME = "domain-name"
IP = "0.0.0.0"
@ -16,7 +17,7 @@ HOST_AS_DICT = {
"os": {},
"services": {},
"icmp": False,
"default_server": None,
"default_server": "",
}
EXPLOITER_NAME = "SSHExploiter"
EXPLOITER_INFO = {
@ -27,7 +28,7 @@ EXPLOITER_INFO = {
"vulnerable_ports": [],
"executed_cmds": [],
}
EXPLOITER_ATTEMPTS = []
EXPLOITER_ATTEMPTS: Iterable = []
RESULT = False
OS_LINUX = "linux"
ERROR_MSG = "failed because yolo"

View File

@ -1,4 +1,5 @@
import json
from typing import Any, Dict
import pytest
@ -14,9 +15,9 @@ HOST_AS_DICT = {
"os": {},
"services": {},
"icmp": False,
"default_server": None,
"default_server": "",
}
HOST_SERVICES = {}
HOST_SERVICES: Dict[str, Any] = {}
@pytest.fixture

View File

@ -18,7 +18,7 @@ SCAN_DATA_MOCK = [
},
},
"monkey_exe": None,
"default_server": None,
"default_server": "",
},
}
]