Agent: Refactor the SMB fingerprinter to implement IFingerprinter

This commit is contained in:
Mike Salvatore 2022-02-09 09:55:00 -05:00
parent a7022011d9
commit ab3daeb2e8
1 changed files with 30 additions and 21 deletions

View File

@ -1,11 +1,19 @@
import logging import logging
import socket import socket
import struct import struct
from typing import Dict
from odict import odict from odict import odict
from infection_monkey.network.HostFinger import HostFinger from infection_monkey.i_puppet import (
FingerprintData,
IFingerprinter,
PingScanData,
PortScanData,
PortStatus,
)
SMB_DISPLAY_NAME = "SMB"
SMB_PORT = 445 SMB_PORT = 445
SMB_SERVICE = "tcp-445" SMB_SERVICE = "tcp-445"
@ -127,22 +135,25 @@ class SMBSessionFingerprintData(Packet):
self.fields["bcc1"] = struct.pack("<i", len(self.fields["Data"]))[:2] self.fields["bcc1"] = struct.pack("<i", len(self.fields["Data"]))[:2]
class SMBFingerprinter(HostFinger): class SMBFingerprinter(IFingerprinter):
_SCANNED_SERVICE = "SMB" def get_host_fingerprint(
self,
def __init__(self): host: str,
from infection_monkey.config import WormConfiguration _ping_scan_data: PingScanData,
port_scan_data: Dict[int, PortScanData],
self._config = WormConfiguration _options: Dict,
) -> FingerprintData:
def get_host_fingerprint(self, host): services = {}
smb_service = {
"display_name": SMB_DISPLAY_NAME,
"port": SMB_PORT,
}
os_type = None
os_version = None
try: try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.7) s.settimeout(0.7)
s.connect((host.ip_addr, SMB_PORT)) s.connect((host, SMB_PORT))
self.init_service(host.services, SMB_SERVICE, SMB_PORT)
h = SMBHeader(cmd=b"\x72", flag1=b"\x18", flag2=b"\x53\xc8") h = SMBHeader(cmd=b"\x72", flag1=b"\x18", flag2=b"\x53\xc8")
n = SMBNego(data=SMBNegoFingerprintData()) n = SMBNego(data=SMBNegoFingerprintData())
@ -174,16 +185,14 @@ class SMBFingerprinter(HostFinger):
) )
if os_version.lower() != "unix": if os_version.lower() != "unix":
host.os["type"] = "windows" os_type = "windows"
else: else:
host.os["type"] = "linux" os_type = "linux"
host.services[SMB_SERVICE]["name"] = service_client smb_service["name"] = service_client
if "version" not in host.os:
host.os["version"] = os_version
return True services[SMB_SERVICE] = smb_service
except Exception as exc: except Exception as exc:
logger.debug("Error getting smb fingerprint: %s", exc) logger.debug("Error getting smb fingerprint: %s", exc)
return False return FingerprintData(os_type, os_version, services)