From 978daf973b103b80e8b9cc17d087b9dc2c6bab2a Mon Sep 17 00:00:00 2001 From: vakarisz Date: Fri, 23 Sep 2022 10:45:23 +0300 Subject: [PATCH] Agent: Fix mypy errors related to puppet --- monkey/infection_monkey/i_puppet/i_puppet.py | 10 ++++++---- .../infection_monkey/network_scanning/tcp_scanner.py | 4 ++-- monkey/infection_monkey/puppet/puppet.py | 6 +++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/monkey/infection_monkey/i_puppet/i_puppet.py b/monkey/infection_monkey/i_puppet/i_puppet.py index e949ac373..88e790520 100644 --- a/monkey/infection_monkey/i_puppet/i_puppet.py +++ b/monkey/infection_monkey/i_puppet/i_puppet.py @@ -3,7 +3,7 @@ import threading from collections import namedtuple from dataclasses import dataclass from enum import Enum -from typing import Dict, Iterable, List, Mapping, Sequence +from typing import Dict, Iterable, Mapping, Optional, Sequence from common.credentials import Credentials from infection_monkey.model import VictimHost @@ -26,8 +26,8 @@ class ExploiterResultData: propagation_success: bool = False interrupted: bool = False os: str = "" - info: Mapping = None - attempts: Iterable = None + info: Optional[Mapping] = None + attempts: Optional[Iterable] = None error_message: str = "" @@ -83,7 +83,7 @@ class IPuppet(metaclass=abc.ABCMeta): @abc.abstractmethod def scan_tcp_ports( - self, host: str, ports: List[int], timeout: float = 3 + self, host: str, ports: Sequence[int], timeout: float = 3 ) -> Dict[int, PortScanData]: """ Scans a list of TCP ports on a remote host @@ -125,6 +125,7 @@ class IPuppet(metaclass=abc.ABCMeta): name: str, host: VictimHost, current_depth: int, + servers: Sequence[str], options: Dict, interrupt: threading.Event, ) -> ExploiterResultData: @@ -134,6 +135,7 @@ class IPuppet(metaclass=abc.ABCMeta): :param str name: The name of the exploiter to run :param VictimHost host: A VictimHost object representing the target to exploit :param int current_depth: The current propagation depth + :param servers: List of socket addresses for victim to connect back to :param Dict options: A dictionary containing options that modify the behavior of the exploiter :param threading.Event interrupt: A threading.Event object that signals the exploit to stop diff --git a/monkey/infection_monkey/network_scanning/tcp_scanner.py b/monkey/infection_monkey/network_scanning/tcp_scanner.py index cfb90026b..f103f1556 100644 --- a/monkey/infection_monkey/network_scanning/tcp_scanner.py +++ b/monkey/infection_monkey/network_scanning/tcp_scanner.py @@ -3,7 +3,7 @@ import select import socket import time from pprint import pformat -from typing import Collection, Iterable, Mapping, Tuple +from typing import Collection, Dict, Iterable, Mapping, Tuple from common.utils import Timer from infection_monkey.i_puppet import PortScanData, PortStatus @@ -17,7 +17,7 @@ EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, None, None)} def scan_tcp_ports( host: str, ports_to_scan: Collection[int], timeout: float -) -> Mapping[int, PortScanData]: +) -> Dict[int, PortScanData]: try: return _scan_tcp_ports(host, ports_to_scan, timeout) except Exception: diff --git a/monkey/infection_monkey/puppet/puppet.py b/monkey/infection_monkey/puppet/puppet.py index 5f522ffe9..151783653 100644 --- a/monkey/infection_monkey/puppet/puppet.py +++ b/monkey/infection_monkey/puppet/puppet.py @@ -1,6 +1,6 @@ import logging import threading -from typing import Dict, Iterable, List, Sequence +from typing import Dict, Iterable, Sequence from common.common_consts.timeouts import CONNECTION_TIMEOUT from common.credentials import Credentials @@ -18,7 +18,7 @@ from infection_monkey.model import VictimHost from .plugin_registry import PluginRegistry -EMPTY_FINGERPRINT = PingScanData(False, None) +EMPTY_FINGERPRINT = FingerprintData(None, None, []) logger = logging.getLogger() @@ -44,7 +44,7 @@ class Puppet(IPuppet): return network_scanning.ping(host, timeout) def scan_tcp_ports( - self, host: str, ports: List[int], timeout: float = CONNECTION_TIMEOUT + self, host: str, ports: Sequence[int], timeout: float = CONNECTION_TIMEOUT ) -> Dict[int, PortScanData]: return network_scanning.scan_tcp_ports(host, ports, timeout)