增加测试前数据库备份/测试后数据恢复(支持linux服务器部署mysql, linux下docker部署的mysql服务)

This commit is contained in:
zy7y 2021-01-19 18:55:59 +08:00
parent fdfa90b2fc
commit 7506721e0b
8 changed files with 259 additions and 14 deletions

View File

@ -17,6 +17,7 @@
| yagmail | 0.11.224 | 测试完成后发送邮件 |
| requests| 2.24.0 | 发送请求 |
|pymysql|0.10.1|连接mysql|
|paramiko|2.7.2|ssh连接linux服务器用于备份/删除数据库文件
#### 目录结构
>apiAutoTest
>
@ -63,7 +64,7 @@
>> >db.py : 封装连接mysql方法
>> >read_file.py 读取配置文件、读取excel用例文件
>> >
>> >~~read_data.py~~
>> >data_clearing.py: 数据清洗方法封装ssh2服务器连接数据库备份/恢复 2021/01/19日更新
>> >
>> >
>> >send_email.py 发送邮件
@ -134,6 +135,11 @@ https://www.bilibili.com/video/BV1EE411B7SU?p=10
2020/12/08 优化断言信息增加数据库支持mysql查询操作 使用`@pytest.fixture(scope="session")`来托管数据库对象用例新增sql栏
2020/12/16 使用conftest.py 初始化用例, 增加失败重跑机制, 增加运行文件run优化test_api.py冗余代码
2021/01/19 添加数据清洗功能(测试开始前进行数据库备份-分别在服务器和本地进行,测试结束后将备份用以恢复数据-将尝试从服务器和本地恢复到服务器数据库中docker部署的mysql服务已本地调试通过直接linux部署的mysql并未测试)
> 详细内容见代码注释`tools/data_clearing.py`
> 如不需要使用该功能请做如下处理,如也不使用数据库对象,只需参考 https://gitee.com/zy7y/apiAutoTest/issues/I2BAQL 修改即可
![](https://gitee.com/zy7y/blog_images/raw/master/img/20210119184856.png)
#### 博客园首发
https://www.cnblogs.com/zy7y/p/13426816.html

View File

@ -9,13 +9,24 @@
@desc: 上传文件接口服务,用于调试上传文件接口处理方法源码来自
FastAPI官网 https://fastapi.tiangolo.com/zh/tutorial/request-files/
"""
import random
from typing import List
from fastapi import FastAPI, File, UploadFile
from tools.db import DB
from faker import Faker
fake = Faker('zh_CN')
app = FastAPI()
# 连接数据库
db = DB()
# 创建游标
cursor = db.connection.cursor()
@app.post("/upload_file/", name='上传单文件接口')
async def create_upload_file(file_excel: UploadFile = File(...)):
@ -40,7 +51,40 @@ async def create_upload_files(files: List[UploadFile] = File(...)):
return {"filenames": [file.filename for file in files], "meta": {"msg": "ok"}}
@app.post("/users", summary="新增用户")
async def add_user():
sql = f"insert into user values ({random.randint(10,1000)},'{fake.name()}', '{fake.ean8()}')"
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.connection.commit()
return {"msg": "成功"}
except Exception as e:
# 如果发生错误则回滚
db.connection.rollback()
print(e)
@app.delete("/users", summary="删除用户")
async def delete_user(id: int):
sql = f"DELETE FROM user WHERE id = {id}"
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.connection.commit()
return {"msg": "成功"}
except Exception as e:
# 如果发生错误则回滚
db.connection.rollback()
print(e)
if __name__ == '__main__':
# 启动项目后 访问 http://127.0.0.1:8888/docs 可查看接口文档
import uvicorn
uvicorn.run('api:app', reload=True, port=8888)

View File

@ -0,0 +1,52 @@
-- MySQL dump 10.13 Distrib 8.0.22, for Linux (x86_64)
--
-- Host: 127.0.0.1 Database: apiAutoTest
-- ------------------------------------------------------
-- Server version 8.0.22
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!50503 SET NAMES utf8mb4 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--
-- Table structure for table `user`
--
DROP TABLE IF EXISTS `user`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT NULL,
`password` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=993 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `user`
--
LOCK TABLES `user` WRITE;
/*!40000 ALTER TABLE `user` DISABLE KEYS */;
INSERT INTO `user` VALUES (3,'所属','1231233'),(604,'薛淑珍','71255132'),(633,'方杰','11881865'),(881,'傅晨','45363849'),(992,'莫英','72334041');
/*!40000 ALTER TABLE `user` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2021-01-19 10:19:19

View File

@ -37,17 +37,23 @@ email:
database:
host: localhost
port: 3306
user: test
user: root
# 不用''会被解析成int类型数据
password: '123456'
db_name: test
charset: utf8mb4
# 响应存储/sql结果存储
cache:
host: localhost
port: 6379
db: 0
# 数据库所在的服务器配置
ssh_server:
port: 22
username: root
password: '123456'
# 私有密钥文件路径
private_key_file:
# 如果使用的docker容器部署mysql服务需要传入mysql的容器id/name
mysql_container: mysql8
# 数据库备份文件导出的本地路径, 需要保证存在该文件夹
sql_data_file: backup_sqls/

Binary file not shown.

View File

@ -11,12 +11,26 @@
import pytest
from tools.data_clearing import DataClearing
from tools.db import DB
from tools.read_file import ReadFile
@pytest.fixture(scope="session")
def get_db():
def data_clearing():
"""数据清洗"""
DataClearing.server_init()
# 1. 备份数据库
DataClearing.backup_mysql()
yield
# 2. 恢复数据库
DataClearing.recovery_mysql()
DataClearing.close_client()
# 若不需要数据清洗功能请把get_db()入参拿掉
@pytest.fixture(scope="session")
def get_db(data_clearing):
"""关于其作用域请移步查看官方文档"""
try:
db = DB()

View File

@ -18,12 +18,18 @@ from tools.data_process import DataProcess
# reruns 重试次数 reruns_delay 次数之间的延时设置(单位:秒)
# 失败重跑,会影响总测试时长,如不需要 将 @pytest.mark.flaky(reruns=3, reruns_delay=5) 注释即可
# @pytest.mark.flaky(reruns=2, reruns_delay=1)
def test_main(cases, get_db):
# 此处的cases入参来自与 conftest.py 文件中 cases函数与直接使用 @pytest.mark.parametrize
# 有着差不多的效果
# def test_main(cases, get_db): # 使用数据库功能(包含sql查询数据备份数据恢复)
# # 此处的cases入参来自与 conftest.py 文件中 cases函数与直接使用 @pytest.mark.parametrize
# # 有着差不多的效果
# # 发送请求
# response, expect, sql = BaseRequest.send_request(cases)
# # 执行sql
# DataProcess.handle_sql(sql, get_db)
# # 断言操作
# DataProcess.assert_result(response, expect)
def test_main(cases): # 不使用数据库功能
# 发送请求
response, expect, sql = BaseRequest.send_request(cases)
# 执行sql
DataProcess.handle_sql(sql, get_db)
# 断言操作
DataProcess.assert_result(response, expect)

117
tools/data_clearing.py Normal file
View File

@ -0,0 +1,117 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2021/1/19 11:44
@Author : zy7y
@ProjectName : apiAutoTest
@File : data_clearing.py
@Software : PyCharm
@Github : https://github.com/zy7y
@Blog : https://www.cnblogs.com/zy7y
"""
import os
from datetime import datetime
import paramiko
from tools.read_file import ReadFile
from tools import logger
class ServerTools:
def __init__(self, host: str, port: int = 22, username: str = "root", password: str = None,
private_key_file: str = None):
# 进行SSH连接
self.trans = paramiko.Transport((host, port))
self.host = host
if password is None:
self.trans.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(private_key_file))
else:
self.trans.connect(username=username, password=password)
# 将sshclient的对象的transport指定为以上的trans
self.ssh = paramiko.SSHClient()
logger.success("SSH客户端创建成功.")
self.ssh._transport = self.trans
# 创建SFTP客户端
self.ftp_client = paramiko.SFTPClient.from_transport(self.trans)
logger.success("SFTP客户端创建成功.")
def execute_cmd(self, cmd: str):
"""
:param cmd: 服务器下对应的命令, 可以是list或者str
"""
stdin, stdout, stderr = self.ssh.exec_command(cmd)
error = stderr.read().decode()
logger.info(f"输入命令: {cmd} -> 输出结果: {stdout.read().decode()}")
logger.error(f"异常信息: {error}")
return error
def files_action(self, post: bool, local_path: str = os.getcwd(), remote_path: str = "/root"):
"""
:param post: 动作 True 就是上传 False就是下载
:param local_path: 本地的文件路径 默认当前脚本所在的工作目录
:param remote_path: 服务器上的文件路径默认在/root目录下
"""
if post: # 上传文件
self.ftp_client.put(localpath=local_path, remotepath=f"{remote_path}{os.path.split(local_path)[1]}")
logger.info(f"文件上传成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}")
else: # 下载文件
file_path = local_path + os.path.split(remote_path)[1]
self.ftp_client.get(remotepath=remote_path, localpath=file_path)
logger.info(f"文件下载成功: {self.host}:{remote_path} -> {file_path}")
def ssh_close(self):
"""关闭连接"""
self.trans.close()
logger.info("已关闭SSH连接...")
class DataClearing:
settings = ReadFile.read_config('$.database')
server_settings = settings.get('ssh_server')
server = None
# 导出的sql文件名称及后缀
file_name = f"{settings.get('db_name')}_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.sql"
@classmethod
def server_init(cls, settings=settings, server_settings=server_settings):
cls.server = ServerTools(host=settings.get('host'), port=server_settings.get('port'),
username=server_settings.get('username'),
password=server_settings.get('password'),
private_key_file=server_settings.get('private_key_file'))
# 新建backup_sql文件夹在服务器上存放导出的sql文件
cls.server.execute_cmd("mkdir backup_sql")
@classmethod
def backup_mysql(cls):
"""
备份数据库, 会分别备份在数据库所在服务器的/root/backup_sql/目录下, 与当前项目文件目录下的 backup_sqls
每次备份生成一个数据库名_当前年_月_日T_时_分_秒, 支持linux 服务器上安装的mysql服务(本人未调试),以及linux中docker部署的mysql备份
"""
if cls.server_settings.get('mysql_container') is None:
cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}"
else:
# 将mysql服务的容器中的指定数据库导出 参考文章 https://www.cnblogs.com/wangsongbai/p/12666368.html
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}"
cls.server.execute_cmd(cmd)
cls.server.files_action(0, f"{cls.server_settings.get('sql_data_file')}", f"/root/backup_sql/{cls.file_name}")
@classmethod
def recovery_mysql(cls, sql_file: str = file_name, database: str = settings.get('db_name')):
"""
恢复数据库, 从服务器位置(/root/backup_sql/) 或者本地(../backup_sqls)上传, 传入的需要是.sql文件
:param sql_file: .sql数据库备份文件, 默认就是导出的sql文件名称 默认文件名称是导出的sql文件
:param database: 恢复的数据库名称默认是备份数据库(config.yaml中的db_name)
"""
result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}")
if "No such file or directory" in result:
# 本地上传
cls.server.files_action(1, f"../backup_sqls/{sql_file}", "/root/backup_sql/")
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}"
cls.server.execute_cmd(cmd)
@classmethod
def close_client(cls):
cls.server.ssh_close()