优化示例用例,AutoPEP8 规范格式化代码

This commit is contained in:
zy7y 2021-04-19 13:03:55 +08:00
parent 927b479102
commit 9d3eeaa792
11 changed files with 129 additions and 54 deletions

View File

@ -6,4 +6,5 @@
@file: __init__.py.py @file: __init__.py.py
@ide: PyCharm @ide: PyCharm
@time: 2020/7/31 @time: 2020/7/31
""" """
from .base_requests import BaseRequest

View File

@ -9,7 +9,7 @@
""" """
import requests import requests
from tools import allure_step, allure_title, logger, extractor, rep_expr from tools import allure_step, allure_title, logger, extractor
from tools.data_process import DataProcess from tools.data_process import DataProcess
from tools.read_file import ReadFile from tools.read_file import ReadFile
@ -31,11 +31,13 @@ class BaseRequest(object):
return: 响应结果 预期结果 return: 响应结果 预期结果
""" """
case_number, case_title, path, token, method, parametric_key, file_obj, data, sql, expect, is_save = case case_number, case_title, path, token, method, parametric_key, file_obj, data, sql, expect, is_save = case
logger.debug(f"用例进行处理前数据: \n 接口路径: {path} \n 请求参数: {data} \n 后置sql: {sql} \n 预期结果: {expect} \n 保存响应: {is_save}") logger.debug(
f"用例进行处理前数据: \n 接口路径: {path} \n 请求参数: {data} \n 后置sql: {sql} \n 预期结果: {expect} \n 保存响应: {is_save}")
# allure报告 用例标题 # allure报告 用例标题
allure_title(case_title) allure_title(case_title)
# 处理url、header、data、file、的前置方法 # 处理url、header、data、file、的前置方法
url = ReadFile.read_config(f'$.server.{env}') + DataProcess.handle_path(path) url = ReadFile.read_config(
f'$.server.{env}') + DataProcess.handle_path(path)
allure_step('请求地址', url) allure_step('请求地址', url)
header = DataProcess.handle_header(token) header = DataProcess.handle_header(token)
allure_step('请求头', header) allure_step('请求头', header)
@ -49,16 +51,24 @@ class BaseRequest(object):
allure_step('响应内容', res.json()) allure_step('响应内容', res.json())
# 响应后操作 # 响应后操作
if token == '': if token == '':
DataProcess.have_token['Authorization'] = extractor(res.json(), ReadFile.read_config('$.expr.token')) DataProcess.have_token['Authorization'] = extractor(
res.json(), ReadFile.read_config('$.expr.token'))
allure_step('请求头中添加Token', DataProcess.have_token) allure_step('请求头中添加Token', DataProcess.have_token)
# 保存用例的实际响应 # 保存用例的实际响应
if is_save == "": if is_save == "":
DataProcess.save_response(case_number, res.json()) DataProcess.save_response(case_number, res.json())
allure_step('存储实际响应', DataProcess.response_dict) allure_step('存储实际响应', DataProcess.response_dict)
return res.json(), expect, sql return res.json(), expect, sql
@classmethod @classmethod
def send_api(cls, url, method, parametric_key, header=None, data=None, file=None) -> object: def send_api(
cls,
url,
method,
parametric_key,
header=None,
data=None,
file=None) -> object:
""" """
:param method: 请求方法 :param method: 请求方法
:param url: 请求url :param url: 请求url
@ -72,13 +82,28 @@ class BaseRequest(object):
session = cls.get_session() session = cls.get_session()
if parametric_key == 'params': if parametric_key == 'params':
res = session.request(method=method, url=url, params=data, headers=header) res = session.request(
method=method,
url=url,
params=data,
headers=header)
elif parametric_key == 'data': elif parametric_key == 'data':
res = session.request(method=method, url=url, data=data, files=file, headers=header) res = session.request(
method=method,
url=url,
data=data,
files=file,
headers=header)
elif parametric_key == 'json': elif parametric_key == 'json':
res = session.request(method=method, url=url, json=data, files=file, headers=header) res = session.request(
method=method,
url=url,
json=data,
files=file,
headers=header)
else: else:
raise ValueError( raise ValueError(
'可选关键字为params, json, data') '可选关键字为params, json, data')
logger.info(f'\n最终请求地址:{res.url}\n请求方法:{method}\n请求头:{header}\n请求参数:{data}\n上传文件:{file}\n响应数据:{res.json()}') logger.info(
return res f'\n最终请求地址:{res.url}\n请求方法:{method}\n请求头:{header}\n请求参数:{data}\n上传文件:{file}\n响应数据:{res.json()}')
return res

View File

@ -17,7 +17,7 @@ expr:
token: $.data.token token: $.data.token
file_path: file_path:
test_case: data/case_data.xlsx test_case: data/case_data.xls
report: report/ report: report/
log: log/run{time}.log log: log/run{time}.log

30
run.py
View File

@ -27,27 +27,31 @@ def run():
shutil.rmtree(path='report/') shutil.rmtree(path='report/')
logger.add(file_path['log'], enqueue=True, encoding='utf-8') logger.add(file_path['log'], enqueue=True, encoding='utf-8')
logger.info(""" logger.info("""
_ _ _ _____ _ _ _ _ _____ _
__ _ _ __ (_) / \ _ _| |_ __|_ _|__ ___| |_ __ _ _ __ (_) / \\ _ _| |_ __|_ _|__ ___| |_
/ _` | '_ \| | / _ \| | | | __/ _ \| |/ _ \/ __| __| / _` | '_ \\| | / _ \\| | | | __/ _ \\| |/ _ \\/ __| __|
| (_| | |_) | |/ ___ \ |_| | || (_) | | __/\__ \ |_ | (_| | |_) | |/ ___ \\ |_| | || (_) | | __/\\__ \\ |_
\__,_| .__/|_/_/ \_\__,_|\__\___/|_|\___||___/\__| \\__,_| .__/|_/_/ \\_\\__,_|\\__\\___/|_|\\___||___/\\__|
|_| |_|
Starting ... ... ... Starting ... ... ...
""") """)
pytest.main(args=['test/test_api.py', f'--alluredir={file_path["report"]}/data']) pytest.main(
args=[
'test/test_api.py',
f'--alluredir={file_path["report"]}/data'])
# 自动以服务形式打开报告 # 自动以服务形式打开报告
# os.system(f'allure serve {report}/data') # os.system(f'allure serve {report}/data')
# 本地生成报告 # 本地生成报告
os.system(f'allure generate {file_path["report"]}/data -o {file_path["report"]}/html --clean') os.system(
f'allure generate {file_path["report"]}/data -o {file_path["report"]}/html --clean')
logger.success('报告已生成') logger.success('报告已生成')
# 发送邮件带附件报告 # # 发送邮件带附件报告
EmailServe.send_email(email, file_path['report']) # EmailServe.send_email(email, file_path['report'])
#
# 删除本地附件 # # 删除本地附件
os.remove(email['enclosures']) # os.remove(email['enclosures'])
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -10,7 +10,7 @@
""" """
from .conftest import pytest from .conftest import pytest
from api.base_requests import BaseRequest from api import BaseRequest
from tools.data_process import DataProcess from tools.data_process import DataProcess

View File

@ -42,7 +42,7 @@ def rep_expr(content: str, data: dict, expr: str = '&(.*?)&') -> str:
""" """
for ctt in re.findall(expr, content): for ctt in re.findall(expr, content):
content = content.replace(f'&{ctt}&', str(extractor(data, ctt))) content = content.replace(f'&{ctt}&', str(extractor(data, ctt)))
# 增加自定义函数得的调用函数写在tools/hooks.py中 # 增加自定义函数得的调用函数写在tools/hooks.py中
for func in re.findall('@(.*?)@', content): for func in re.findall('@(.*?)@', content):
try: try:
@ -89,4 +89,10 @@ def allure_step(step: str, var: str) -> None:
:param var: 附件内容 :param var: 附件内容
""" """
with allure.step(step): with allure.step(step):
allure.attach(json.dumps(var, ensure_ascii=False, indent=4), step, allure.attachment_type.TEXT) allure.attach(
json.dumps(
var,
ensure_ascii=False,
indent=4),
step,
allure.attachment_type.TEXT)

View File

@ -19,13 +19,23 @@ from tools import logger
class ServerTools: class ServerTools:
def __init__(self, host: str, port: int = 22, username: str = "root", password: str = None, def __init__(
private_key_file: str = None, privat_passowrd: str = None): self,
host: str,
port: int = 22,
username: str = "root",
password: str = None,
private_key_file: str = None,
privat_passowrd: str = None):
# 进行SSH连接 # 进行SSH连接
self.trans = paramiko.Transport((host, port)) self.trans = paramiko.Transport((host, port))
self.host = host self.host = host
if password is None: if password is None:
self.trans.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(private_key_file, privat_passowrd)) self.trans.connect(
username=username,
pkey=paramiko.RSAKey.from_private_key_file(
private_key_file,
privat_passowrd))
else: else:
self.trans.connect(username=username, password=password) self.trans.connect(username=username, password=password)
# 将sshclient的对象的transport指定为以上的trans # 将sshclient的对象的transport指定为以上的trans
@ -46,15 +56,22 @@ class ServerTools:
logger.error(f"异常信息: {error}") logger.error(f"异常信息: {error}")
return error return error
def files_action(self, post: bool, local_path: str = os.getcwd(), remote_path: str = "/root"): def files_action(
self,
post: bool,
local_path: str = os.getcwd(),
remote_path: str = "/root"):
""" """
:param post: 动作 True 就是上传 False就是下载 :param post: 动作 True 就是上传 False就是下载
:param local_path: 本地的文件路径 默认当前脚本所在的工作目录 :param local_path: 本地的文件路径 默认当前脚本所在的工作目录
:param remote_path: 服务器上的文件路径默认在/root目录下 :param remote_path: 服务器上的文件路径默认在/root目录下
""" """
if post: # 上传文件 if post: # 上传文件
self.ftp_client.put(localpath=local_path, remotepath=f"{remote_path}{os.path.split(local_path)[1]}") self.ftp_client.put(
logger.info(f"文件上传成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}") localpath=local_path,
remotepath=f"{remote_path}{os.path.split(local_path)[1]}")
logger.info(
f"文件上传成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}")
else: # 下载文件 else: # 下载文件
file_path = local_path + os.path.split(remote_path)[1] file_path = local_path + os.path.split(remote_path)[1]
self.ftp_client.get(remotepath=remote_path, localpath=file_path) self.ftp_client.get(remotepath=remote_path, localpath=file_path)
@ -76,11 +93,13 @@ class DataClearing:
@classmethod @classmethod
def server_init(cls, settings=settings, server_settings=server_settings): def server_init(cls, settings=settings, server_settings=server_settings):
cls.server = ServerTools(host=settings.get('host'), port=server_settings.get('port'), cls.server = ServerTools(
username=server_settings.get('username'), host=settings.get('host'),
password=server_settings.get('password'), port=server_settings.get('port'),
private_key_file=server_settings.get('private_key_file'), username=server_settings.get('username'),
privat_passowrd=server_settings.get('privat_passowrd')) password=server_settings.get('password'),
private_key_file=server_settings.get('private_key_file'),
privat_passowrd=server_settings.get('privat_passowrd'))
# 新建backup_sql文件夹在服务器上存放导出的sql文件 # 新建backup_sql文件夹在服务器上存放导出的sql文件
cls.server.execute_cmd("mkdir backup_sql") cls.server.execute_cmd("mkdir backup_sql")
@ -93,14 +112,19 @@ class DataClearing:
if cls.server_settings.get('mysql_container') is None: if cls.server_settings.get('mysql_container') is None:
cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}" cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}"
else: else:
# 将mysql服务的容器中的指定数据库导出 参考文章 https://www.cnblogs.com/wangsongbai/p/12666368.html # 将mysql服务的容器中的指定数据库导出 参考文章
# https://www.cnblogs.com/wangsongbai/p/12666368.html
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}" cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}"
cls.server.execute_cmd(cmd) cls.server.execute_cmd(cmd)
cls.server.files_action(0, f"{cls.server_settings.get('sql_data_file')}", f"/root/backup_sql/{cls.file_name}") cls.server.files_action(0,
f"{cls.server_settings.get('sql_data_file')}",
f"/root/backup_sql/{cls.file_name}")
@classmethod @classmethod
def recovery_mysql(cls, sql_file: str = file_name, database: str = settings.get('db_name')): def recovery_mysql(
cls,
sql_file: str = file_name,
database: str = settings.get('db_name')):
""" """
恢复数据库, 从服务器位置(/root/backup_sql/) 或者本地(../backup_sqls)上传, 传入的需要是.sql文件 恢复数据库, 从服务器位置(/root/backup_sql/) 或者本地(../backup_sqls)上传, 传入的需要是.sql文件
:param sql_file: .sql数据库备份文件, 默认就是导出的sql文件名称 默认文件名称是导出的sql文件 :param sql_file: .sql数据库备份文件, 默认就是导出的sql文件名称 默认文件名称是导出的sql文件
@ -109,7 +133,8 @@ class DataClearing:
result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}") result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}")
if "No such file or directory" in result: if "No such file or directory" in result:
# 本地上传 # 本地上传
cls.server.files_action(1, f"../backup_sqls/{sql_file}", "/root/backup_sql/") cls.server.files_action(
1, f"../backup_sqls/{sql_file}", "/root/backup_sql/")
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}" cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}"
cls.server.execute_cmd(cmd) cls.server.execute_cmd(cmd)

View File

@ -112,10 +112,11 @@ class DataProcess:
# 获取需要断言的实际结果部分 # 获取需要断言的实际结果部分
actual = extractor(response, k) actual = extractor(response, k)
index += 1 index += 1
logger.info(f'{index}个断言,实际结果:{actual} | 预期结果:{v} \n断言结果 {actual == v}') logger.info(
allure_step(f'{index}个断言', f'实际结果:{actual} = 预期结果:{v}') f'{index}个断言,实际结果:{actual} | 预期结果:{v} \n断言结果 {actual == v}')
allure_step(f'{index}个断言', f'实际结果:{actual} = 预期结果:{v}')
try: try:
assert actual == v assert actual == v
except AssertionError: except AssertionError:
raise AssertionError(f'{index}个断言失败 -|- 实际结果:{actual} || 预期结果: {v}') raise AssertionError(
f'{index}个断言失败 -|- 实际结果:{actual} || 预期结果: {v}')

View File

@ -25,7 +25,8 @@ class ReadFile:
if cls.config_dict is None: if cls.config_dict is None:
# 指定编码格式解决win下跑代码抛出错误 # 指定编码格式解决win下跑代码抛出错误
with open(config_path, 'r', encoding='utf-8') as file: with open(config_path, 'r', encoding='utf-8') as file:
cls.config_dict = yaml.load(file.read(), Loader=yaml.FullLoader) cls.config_dict = yaml.load(
file.read(), Loader=yaml.FullLoader)
return cls.config_dict return cls.config_dict
@classmethod @classmethod

View File

@ -30,7 +30,10 @@ class EmailServe:
fpath = path.replace(file_path, '') fpath = path.replace(file_path, '')
for filename in filenames: for filename in filenames:
zip.write(os.path.join(path, filename), os.path.join(fpath, filename)) zip.write(
os.path.join(
path, filename), os.path.join(
fpath, filename))
zip.close() zip.close()
@staticmethod @staticmethod
@ -47,10 +50,19 @@ class EmailServe:
:param file_path: 需要压缩的文件夹 :param file_path: 需要压缩的文件夹
:return: :return:
""" """
EmailServe.zip_report(file_path=file_path, out_path=setting['enclosures']) EmailServe.zip_report(
yag = yagmail.SMTP(setting['user'], setting['password'], setting['host']) file_path=file_path,
out_path=setting['enclosures'])
yag = yagmail.SMTP(
setting['user'],
setting['password'],
setting['host'])
# 发送邮件 # 发送邮件
yag.send(setting['addressees'], setting['title'], setting['contents'], setting['enclosures']) yag.send(
setting['addressees'],
setting['title'],
setting['contents'],
setting['enclosures'])
# 关闭服务 # 关闭服务
yag.close() yag.close()
logger.info("邮件发送成功!") logger.info("邮件发送成功!")