forked from DxvLwRYF/apiAutoTest
优化执行sql方法,先支持多条sql查询,select 只支持获取第一条查询结果
This commit is contained in:
parent
4a5b6f807b
commit
2f97664f84
|
@ -95,4 +95,14 @@ def allure_step(step: str, var: str) -> None:
|
|||
ensure_ascii=False,
|
||||
indent=4),
|
||||
step,
|
||||
allure.attachment_type.TEXT)
|
||||
allure.attachment_type.JSON)
|
||||
|
||||
|
||||
def allure_step_no(step: str):
|
||||
"""
|
||||
无附件的操作步骤
|
||||
:param step: 步骤名称
|
||||
:return:
|
||||
"""
|
||||
with allure.step(step):
|
||||
pass
|
||||
|
|
|
@ -48,7 +48,7 @@ class ServerTools:
|
|||
|
||||
def execute_cmd(self, cmd: str):
|
||||
"""
|
||||
:param cmd: 服务器下对应的命令, 可以是list,或者str
|
||||
:param cmd: 服务器下对应的命令
|
||||
"""
|
||||
stdin, stdout, stderr = self.ssh.exec_command(cmd)
|
||||
error = stderr.read().decode()
|
||||
|
|
|
@ -7,7 +7,8 @@
|
|||
@ide: PyCharm
|
||||
@time: 2020/11/18
|
||||
"""
|
||||
from tools import logger, extractor, convert_json, rep_expr, allure_step
|
||||
from tools import logger, extractor, convert_json, rep_expr, allure_step, allure_step_no
|
||||
from tools.db import DB
|
||||
from tools.read_file import ReadFile
|
||||
|
||||
|
||||
|
@ -24,16 +25,21 @@ class DataProcess:
|
|||
"""
|
||||
cls.response_dict[key] = value
|
||||
logger.info(f'添加key: {key}, 对应value: {value}')
|
||||
allure_step('存储实际响应', cls.response_dict)
|
||||
|
||||
@classmethod
|
||||
def handle_path(cls, path_str: str) -> str:
|
||||
def handle_path(cls, path_str: str, env: str) -> str:
|
||||
"""路径参数处理
|
||||
:param path_str: 带提取表达式的字符串 /&$.case_005.data.id&/state/&$.case_005.data.create_time&
|
||||
:param env: 环境名称, 对应的是环境基准地址
|
||||
上述内容表示,从响应字典中提取到case_005字典里data字典里id的值,假设是500,后面&$.case_005.data.create_time& 类似,最终提取结果
|
||||
return /511/state/1605711095
|
||||
"""
|
||||
# /&$.case.data.id&/state/&$.case_005.data.create_time&
|
||||
return rep_expr(path_str, cls.response_dict)
|
||||
url = ReadFile.read_config(
|
||||
f'$.server.{env}') + rep_expr(path_str, cls.response_dict)
|
||||
allure_step_no(f'请求地址: {url}')
|
||||
return url
|
||||
|
||||
@classmethod
|
||||
def handle_header(cls, header_str: str) -> dict:
|
||||
|
@ -44,6 +50,7 @@ class DataProcess:
|
|||
if header_str == '':
|
||||
header_str = '{}'
|
||||
cls.header.update(cls.handle_data(header_str))
|
||||
allure_step('请求头', cls.header)
|
||||
return cls.header
|
||||
|
||||
@classmethod
|
||||
|
@ -52,18 +59,18 @@ class DataProcess:
|
|||
:param file_obj: 上传文件使用,格式:接口中文件参数的名称:"文件路径地址"/["文件地址1", "文件地址2"]
|
||||
实例- 单个文件: &file&D:
|
||||
"""
|
||||
if file_obj == '':
|
||||
return
|
||||
for k, v in convert_json(file_obj).items():
|
||||
# 多文件上传
|
||||
if isinstance(v, list):
|
||||
files = []
|
||||
for path in v:
|
||||
files.append((k, (open(path, 'rb'))))
|
||||
else:
|
||||
# 单文件上传
|
||||
files = {k: open(v, 'rb')}
|
||||
return files
|
||||
if file_obj != '':
|
||||
for k, v in convert_json(file_obj).items():
|
||||
# 多文件上传
|
||||
if isinstance(v, list):
|
||||
files = []
|
||||
for path in v:
|
||||
files.append((k, (open(path, 'rb'))))
|
||||
else:
|
||||
# 单文件上传
|
||||
files = {k: open(v, 'rb')}
|
||||
allure_step('上传文件', file_obj)
|
||||
return files
|
||||
|
||||
@classmethod
|
||||
def handle_data(cls, variable: str) -> dict:
|
||||
|
@ -71,35 +78,35 @@ class DataProcess:
|
|||
:param variable: 请求数据,传入的是可转换字典/json的字符串,其中可以包含变量表达式
|
||||
return 处理之后的json/dict类型的字典数据
|
||||
"""
|
||||
if variable == '':
|
||||
return
|
||||
data = rep_expr(variable, cls.response_dict)
|
||||
variable = convert_json(data)
|
||||
return variable
|
||||
if variable != '':
|
||||
data = rep_expr(variable, cls.response_dict)
|
||||
variable = convert_json(data)
|
||||
return variable
|
||||
|
||||
@classmethod
|
||||
def handle_sql(cls, sql: str, db: object):
|
||||
"""处理sql,并将结果写到响应字典中"""
|
||||
if sql not in ['no', '']:
|
||||
sql = rep_expr(sql, DataProcess.response_dict)
|
||||
else:
|
||||
sql = None
|
||||
allure_step('运行sql', sql)
|
||||
logger.info(sql)
|
||||
if sql is not None:
|
||||
# 多条sql,在用例中填写方式如下select * from user; select * from goods 每条sql语句之间需要使用 ; 来分割
|
||||
for sql in sql.split(";"):
|
||||
sql = sql.strip()
|
||||
if sql == '':
|
||||
continue
|
||||
# 查后置sql
|
||||
result = db.fetch_one(sql)
|
||||
allure_step('sql执行结果', result)
|
||||
logger.info(f'结果:{result}')
|
||||
if result is not None:
|
||||
# 将查询结果添加到响应字典里面,作用在,接口响应的内容某个字段 直接和数据库某个字段比对,在预期结果中
|
||||
# 使用同样的语法提取即可
|
||||
DataProcess.response_dict.update(result)
|
||||
def handle_sql(cls, sql: str, db: DB):
|
||||
"""
|
||||
处理sql,如果sql执行的结果不会空,执行sql的结果和响应结果字典合并
|
||||
:param sql: 支持单条或者多条sql,其中多条sql使用 ; 进行分割
|
||||
多条sql,在用例中填写方式如下select * from user; select * from goods 每条sql语句之间需要使用 ; 来分割
|
||||
单条sql,select * from user 或者 select * from user;
|
||||
:param db: 数据库连接对象
|
||||
:return:
|
||||
"""
|
||||
sql = rep_expr(sql, DataProcess.response_dict)
|
||||
|
||||
for sql in sql.split(";"):
|
||||
sql = sql.strip()
|
||||
if sql == '':
|
||||
continue
|
||||
# 查后置sql
|
||||
result = db.execute_sql(sql)
|
||||
allure_step(f'执行sql: {sql}', result)
|
||||
logger.info(f'执行sql: {sql} \n 结果: {result}')
|
||||
if result is not None:
|
||||
# 将查询结果添加到响应字典里面,作用在,接口响应的内容某个字段 直接和数据库某个字段比对,在预期结果中
|
||||
# 使用同样的语法提取即可
|
||||
DataProcess.response_dict.update(result)
|
||||
|
||||
@classmethod
|
||||
def assert_result(cls, response: dict, expect_str: str):
|
||||
|
|
44
tools/db.py
44
tools/db.py
|
@ -8,6 +8,9 @@
|
|||
@time: 2020/12/4
|
||||
@desc: 数据库连接,目前只支持mysql ,且个人认为用到最多的操作应该是查询所以其他todo
|
||||
"""
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import Union
|
||||
|
||||
import pymysql
|
||||
|
||||
|
@ -18,31 +21,46 @@ class DB:
|
|||
mysql = ReadFile.read_config('$.database')
|
||||
|
||||
def __init__(self):
|
||||
"""初始化连接Mysql"""
|
||||
"""
|
||||
初始化数据库连接,并指定查询的结果集以字典形式返回
|
||||
"""
|
||||
self.connection = pymysql.connect(
|
||||
host=self.mysql.get('host', 'localhost'),
|
||||
port=self.mysql.get('port', 3306),
|
||||
user=self.mysql.get('user', 'root'),
|
||||
password=self.mysql.get('password', '123456'),
|
||||
db=self.mysql.get('db_name', 'test'),
|
||||
host=self.mysql['host'],
|
||||
port=self.mysql['port'],
|
||||
user=self.mysql['user'],
|
||||
password=self.mysql['password'],
|
||||
db=self.mysql['db_name'],
|
||||
charset=self.mysql.get('charset', 'utf8mb4'),
|
||||
cursorclass=pymysql.cursors.DictCursor
|
||||
)
|
||||
|
||||
def fetch_one(self, sql: str) -> object:
|
||||
"""查询数据,查一条"""
|
||||
def execute_sql(self, sql: str) -> Union[dict, None]:
|
||||
"""
|
||||
执行sql语句方法,查询所有结果的sql只会返回一条结果(
|
||||
比如说: 使用select * from cases , 结果将只会返回第一条数据 {'id': 1, 'name': 'updatehahaha', 'path': None, 'body': None, 'expected': '{"msg": "你好"}', 'api_id': 1, 'create_at': '2021-05-17 17:23:54', 'update_at': '2021-05-17 17:23:54'}
|
||||
|
||||
),支持select, delete, insert, update
|
||||
:param sql: sql语句
|
||||
:return: select 语句 如果有结果则会返回 对应结果字典,delete,insert,update 将返回None
|
||||
"""
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(sql)
|
||||
result = cursor.fetchone()
|
||||
# 使用commit解决查询数据出现概率查错问题
|
||||
self.connection.commit()
|
||||
return self.verify(result)
|
||||
|
||||
def verify(self, result: dict) -> Union[dict, None]:
|
||||
"""验证结果能否被json.dumps序列化"""
|
||||
# 尝试变成字符串,解决datetime 无法被json 序列化问题
|
||||
try:
|
||||
json.dumps(result)
|
||||
except TypeError: # TypeError: Object of type datetime is not JSON serializable
|
||||
for k, v in result.items():
|
||||
if isinstance(v, datetime):
|
||||
result[k] = str(v)
|
||||
return result
|
||||
|
||||
def close(self):
|
||||
"""关闭数据库连接"""
|
||||
self.connection.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(ReadFile.read_config('$.database'))
|
||||
DB()
|
||||
|
|
Loading…
Reference in New Issue