diff --git a/tools/__init__.py b/tools/__init__.py index 6d6964b..d4728e1 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -95,4 +95,14 @@ def allure_step(step: str, var: str) -> None: ensure_ascii=False, indent=4), step, - allure.attachment_type.TEXT) + allure.attachment_type.JSON) + + +def allure_step_no(step: str): + """ + 无附件的操作步骤 + :param step: 步骤名称 + :return: + """ + with allure.step(step): + pass diff --git a/tools/data_clearing.py b/tools/data_clearing.py index fc843f1..6274aa3 100644 --- a/tools/data_clearing.py +++ b/tools/data_clearing.py @@ -48,7 +48,7 @@ class ServerTools: def execute_cmd(self, cmd: str): """ - :param cmd: 服务器下对应的命令, 可以是list,或者str + :param cmd: 服务器下对应的命令 """ stdin, stdout, stderr = self.ssh.exec_command(cmd) error = stderr.read().decode() diff --git a/tools/data_process.py b/tools/data_process.py index 5842380..e7468da 100644 --- a/tools/data_process.py +++ b/tools/data_process.py @@ -7,7 +7,8 @@ @ide: PyCharm @time: 2020/11/18 """ -from tools import logger, extractor, convert_json, rep_expr, allure_step +from tools import logger, extractor, convert_json, rep_expr, allure_step, allure_step_no +from tools.db import DB from tools.read_file import ReadFile @@ -24,16 +25,21 @@ class DataProcess: """ cls.response_dict[key] = value logger.info(f'添加key: {key}, 对应value: {value}') + allure_step('存储实际响应', cls.response_dict) @classmethod - def handle_path(cls, path_str: str) -> str: + def handle_path(cls, path_str: str, env: str) -> str: """路径参数处理 :param path_str: 带提取表达式的字符串 /&$.case_005.data.id&/state/&$.case_005.data.create_time& + :param env: 环境名称, 对应的是环境基准地址 上述内容表示,从响应字典中提取到case_005字典里data字典里id的值,假设是500,后面&$.case_005.data.create_time& 类似,最终提取结果 return /511/state/1605711095 """ # /&$.case.data.id&/state/&$.case_005.data.create_time& - return rep_expr(path_str, cls.response_dict) + url = ReadFile.read_config( + f'$.server.{env}') + rep_expr(path_str, cls.response_dict) + allure_step_no(f'请求地址: {url}') + return url @classmethod def handle_header(cls, header_str: str) -> dict: @@ -44,6 +50,7 @@ class DataProcess: if header_str == '': header_str = '{}' cls.header.update(cls.handle_data(header_str)) + allure_step('请求头', cls.header) return cls.header @classmethod @@ -52,18 +59,18 @@ class DataProcess: :param file_obj: 上传文件使用,格式:接口中文件参数的名称:"文件路径地址"/["文件地址1", "文件地址2"] 实例- 单个文件: &file&D: """ - if file_obj == '': - return - for k, v in convert_json(file_obj).items(): - # 多文件上传 - if isinstance(v, list): - files = [] - for path in v: - files.append((k, (open(path, 'rb')))) - else: - # 单文件上传 - files = {k: open(v, 'rb')} - return files + if file_obj != '': + for k, v in convert_json(file_obj).items(): + # 多文件上传 + if isinstance(v, list): + files = [] + for path in v: + files.append((k, (open(path, 'rb')))) + else: + # 单文件上传 + files = {k: open(v, 'rb')} + allure_step('上传文件', file_obj) + return files @classmethod def handle_data(cls, variable: str) -> dict: @@ -71,35 +78,35 @@ class DataProcess: :param variable: 请求数据,传入的是可转换字典/json的字符串,其中可以包含变量表达式 return 处理之后的json/dict类型的字典数据 """ - if variable == '': - return - data = rep_expr(variable, cls.response_dict) - variable = convert_json(data) - return variable + if variable != '': + data = rep_expr(variable, cls.response_dict) + variable = convert_json(data) + return variable @classmethod - def handle_sql(cls, sql: str, db: object): - """处理sql,并将结果写到响应字典中""" - if sql not in ['no', '']: - sql = rep_expr(sql, DataProcess.response_dict) - else: - sql = None - allure_step('运行sql', sql) - logger.info(sql) - if sql is not None: - # 多条sql,在用例中填写方式如下select * from user; select * from goods 每条sql语句之间需要使用 ; 来分割 - for sql in sql.split(";"): - sql = sql.strip() - if sql == '': - continue - # 查后置sql - result = db.fetch_one(sql) - allure_step('sql执行结果', result) - logger.info(f'结果:{result}') - if result is not None: - # 将查询结果添加到响应字典里面,作用在,接口响应的内容某个字段 直接和数据库某个字段比对,在预期结果中 - # 使用同样的语法提取即可 - DataProcess.response_dict.update(result) + def handle_sql(cls, sql: str, db: DB): + """ + 处理sql,如果sql执行的结果不会空,执行sql的结果和响应结果字典合并 + :param sql: 支持单条或者多条sql,其中多条sql使用 ; 进行分割 + 多条sql,在用例中填写方式如下select * from user; select * from goods 每条sql语句之间需要使用 ; 来分割 + 单条sql,select * from user 或者 select * from user; + :param db: 数据库连接对象 + :return: + """ + sql = rep_expr(sql, DataProcess.response_dict) + + for sql in sql.split(";"): + sql = sql.strip() + if sql == '': + continue + # 查后置sql + result = db.execute_sql(sql) + allure_step(f'执行sql: {sql}', result) + logger.info(f'执行sql: {sql} \n 结果: {result}') + if result is not None: + # 将查询结果添加到响应字典里面,作用在,接口响应的内容某个字段 直接和数据库某个字段比对,在预期结果中 + # 使用同样的语法提取即可 + DataProcess.response_dict.update(result) @classmethod def assert_result(cls, response: dict, expect_str: str): diff --git a/tools/db.py b/tools/db.py index 654f7e8..649510b 100644 --- a/tools/db.py +++ b/tools/db.py @@ -8,6 +8,9 @@ @time: 2020/12/4 @desc: 数据库连接,目前只支持mysql ,且个人认为用到最多的操作应该是查询所以其他todo """ +import json +from datetime import datetime +from typing import Union import pymysql @@ -18,31 +21,46 @@ class DB: mysql = ReadFile.read_config('$.database') def __init__(self): - """初始化连接Mysql""" + """ + 初始化数据库连接,并指定查询的结果集以字典形式返回 + """ self.connection = pymysql.connect( - host=self.mysql.get('host', 'localhost'), - port=self.mysql.get('port', 3306), - user=self.mysql.get('user', 'root'), - password=self.mysql.get('password', '123456'), - db=self.mysql.get('db_name', 'test'), + host=self.mysql['host'], + port=self.mysql['port'], + user=self.mysql['user'], + password=self.mysql['password'], + db=self.mysql['db_name'], charset=self.mysql.get('charset', 'utf8mb4'), cursorclass=pymysql.cursors.DictCursor ) - def fetch_one(self, sql: str) -> object: - """查询数据,查一条""" + def execute_sql(self, sql: str) -> Union[dict, None]: + """ + 执行sql语句方法,查询所有结果的sql只会返回一条结果( + 比如说: 使用select * from cases , 结果将只会返回第一条数据 {'id': 1, 'name': 'updatehahaha', 'path': None, 'body': None, 'expected': '{"msg": "你好"}', 'api_id': 1, 'create_at': '2021-05-17 17:23:54', 'update_at': '2021-05-17 17:23:54'} + + ),支持select, delete, insert, update + :param sql: sql语句 + :return: select 语句 如果有结果则会返回 对应结果字典,delete,insert,update 将返回None + """ with self.connection.cursor() as cursor: cursor.execute(sql) result = cursor.fetchone() # 使用commit解决查询数据出现概率查错问题 self.connection.commit() + return self.verify(result) + + def verify(self, result: dict) -> Union[dict, None]: + """验证结果能否被json.dumps序列化""" + # 尝试变成字符串,解决datetime 无法被json 序列化问题 + try: + json.dumps(result) + except TypeError: # TypeError: Object of type datetime is not JSON serializable + for k, v in result.items(): + if isinstance(v, datetime): + result[k] = str(v) return result def close(self): """关闭数据库连接""" self.connection.close() - - -if __name__ == '__main__': - print(ReadFile.read_config('$.database')) - DB()