Agent: Rework return value in _check_tcp_ports in tcp_scanner.py
This commit is contained in:
parent
0e7f171c4a
commit
eb1a322ff8
|
@ -14,8 +14,10 @@ logger = logging.getLogger(__name__)
|
||||||
def scan_tcp_ports(host: str, ports: List[int], timeout: float) -> Dict[int, PortScanData]:
|
def scan_tcp_ports(host: str, ports: List[int], timeout: float) -> Dict[int, PortScanData]:
|
||||||
ports_scan = {}
|
ports_scan = {}
|
||||||
|
|
||||||
open_ports, banners = _check_tcp_ports(host, ports, timeout)
|
open_ports_data = _check_tcp_ports(host, ports, timeout)
|
||||||
open_ports = set(open_ports)
|
|
||||||
|
open_ports = set(open_ports_data["open_ports"])
|
||||||
|
banners = open_ports_data["banners"]
|
||||||
|
|
||||||
for port, banner in zip_longest(ports, banners, fillvalue=None):
|
for port, banner in zip_longest(ports, banners, fillvalue=None):
|
||||||
ports_scan[port] = _build_port_scan_data(port, open_ports, banner)
|
ports_scan[port] = _build_port_scan_data(port, open_ports, banner)
|
||||||
|
@ -35,14 +37,18 @@ def _get_closed_port_data(port: int) -> PortScanData:
|
||||||
return PortScanData(port, PortStatus.CLOSED, None, None)
|
return PortScanData(port, PortStatus.CLOSED, None, None)
|
||||||
|
|
||||||
|
|
||||||
def _check_tcp_ports(ip: str, ports: List[int], timeout: float = DEFAULT_TIMEOUT):
|
def _check_tcp_ports(
|
||||||
|
ip: str, ports: List[int], timeout: float = DEFAULT_TIMEOUT
|
||||||
|
) -> Dict[str, List]:
|
||||||
"""
|
"""
|
||||||
Checks whether any of the given ports are open on a target IP.
|
Checks whether any of the given ports are open on a target IP.
|
||||||
:param ip: IP of host to attack
|
:param ip: IP of host to attack
|
||||||
:param ports: List of ports to attack. Must not be empty.
|
:param ports: List of ports to attack. Must not be empty.
|
||||||
:param timeout: Amount of time to wait for connection
|
:param timeout: Amount of time to wait for connection
|
||||||
:return: List of open ports.
|
:return: Dict with list of open ports and list of banners.
|
||||||
"""
|
"""
|
||||||
|
open_ports_data = {"open_ports": [], "banners": []}
|
||||||
|
|
||||||
sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for _ in range(len(ports))]
|
sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for _ in range(len(ports))]
|
||||||
for s in sockets:
|
for s in sockets:
|
||||||
s.setblocking(False)
|
s.setblocking(False)
|
||||||
|
@ -108,11 +114,11 @@ def _check_tcp_ports(ip: str, ports: List[int], timeout: float = DEFAULT_TIMEOUT
|
||||||
s[1].shutdown(socket.SHUT_RDWR)
|
s[1].shutdown(socket.SHUT_RDWR)
|
||||||
s[1].close()
|
s[1].close()
|
||||||
|
|
||||||
# TODO: Rework the return of this function. Consider using dictionary
|
open_ports_data["open_ports"] = [port for port, _ in connected_ports_sockets]
|
||||||
return [port for port, sock in connected_ports_sockets], banners
|
open_ports_data["banners"] = banners
|
||||||
else:
|
|
||||||
return [], []
|
|
||||||
|
|
||||||
except socket.error as exc:
|
except socket.error as exc:
|
||||||
logger.warning("Exception when checking ports on host %s, Exception: %s", str(ip), exc)
|
logger.warning("Exception when checking ports on host %s, Exception: %s", str(ip), exc)
|
||||||
return [], []
|
|
||||||
|
finally:
|
||||||
|
return open_ports_data
|
||||||
|
|
Loading…
Reference in New Issue