diff --git a/README.md b/README.md index 12d2309..426a89c 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ | yagmail | 0.11.224 | 测试完成后发送邮件 | | requests| 2.24.0 | 发送请求 | |pymysql|0.10.1|连接mysql| +|paramiko|2.7.2|ssh连接linux服务器,用于备份/删除数据库文件 #### 目录结构 >apiAutoTest > @@ -63,7 +64,7 @@ >> >db.py : 封装连接mysql方法 >> >read_file.py: 读取配置文件、读取excel用例文件 >> > ->> >~~read_data.py:~~ +>> >data_clearing.py: 数据清洗方法封装,ssh2服务器连接,数据库备份/恢复 2021/01/19日更新 >> > >> > >> >send_email.py : 发送邮件 @@ -134,6 +135,11 @@ https://www.bilibili.com/video/BV1EE411B7SU?p=10 2020/12/08 优化断言信息,增加数据库(支持mysql)查询操作, 使用`@pytest.fixture(scope="session")`来托管数据库对象,用例新增sql栏 2020/12/16 使用conftest.py 初始化用例, 增加失败重跑机制, 增加运行文件run,优化test_api.py冗余代码 + +2021/01/19 添加数据清洗功能(测试开始前进行数据库备份-分别在服务器和本地进行,测试结束后将备份用以恢复数据-将尝试从服务器和本地恢复到服务器数据库中,docker部署的mysql服务已本地调试通过,直接linux部署的mysql并未测试) +> 详细内容见代码注释`tools/data_clearing.py` +> 如不需要使用该功能请做如下处理,如也不使用数据库对象,只需参考 https://gitee.com/zy7y/apiAutoTest/issues/I2BAQL 修改即可 +![](https://gitee.com/zy7y/blog_images/raw/master/img/20210119184856.png) #### 博客园首发 https://www.cnblogs.com/zy7y/p/13426816.html diff --git a/api_server/api.py b/api_server/api.py index 8dcac25..b261097 100644 --- a/api_server/api.py +++ b/api_server/api.py @@ -9,13 +9,24 @@ @desc: 上传文件接口服务,用于调试上传文件接口处理方法,源码来自 FastAPI官网 https://fastapi.tiangolo.com/zh/tutorial/request-files/ """ - +import random from typing import List from fastapi import FastAPI, File, UploadFile +from tools.db import DB + +from faker import Faker + +fake = Faker('zh_CN') + app = FastAPI() +# 连接数据库 +db = DB() +# 创建游标 +cursor = db.connection.cursor() + @app.post("/upload_file/", name='上传单文件接口') async def create_upload_file(file_excel: UploadFile = File(...)): @@ -40,7 +51,40 @@ async def create_upload_files(files: List[UploadFile] = File(...)): return {"filenames": [file.filename for file in files], "meta": {"msg": "ok"}} +@app.post("/users", summary="新增用户") +async def add_user(): + sql = f"insert into user values ({random.randint(10,1000)},'{fake.name()}', '{fake.ean8()}')" + try: + # 执行sql语句 + cursor.execute(sql) + # 提交到数据库执行 + db.connection.commit() + return {"msg": "成功"} + except Exception as e: + # 如果发生错误则回滚 + db.connection.rollback() + print(e) + + + +@app.delete("/users", summary="删除用户") +async def delete_user(id: int): + sql = f"DELETE FROM user WHERE id = {id}" + try: + # 执行sql语句 + cursor.execute(sql) + # 提交到数据库执行 + db.connection.commit() + return {"msg": "成功"} + except Exception as e: + # 如果发生错误则回滚 + db.connection.rollback() + print(e) + + + if __name__ == '__main__': # 启动项目后 访问 http://127.0.0.1:8888/docs 可查看接口文档 import uvicorn + uvicorn.run('api:app', reload=True, port=8888) diff --git a/backup_sqls/apiAutoTest_2021-01-19T18_14_52.sql b/backup_sqls/apiAutoTest_2021-01-19T18_14_52.sql new file mode 100644 index 0000000..548f2c4 --- /dev/null +++ b/backup_sqls/apiAutoTest_2021-01-19T18_14_52.sql @@ -0,0 +1,52 @@ +-- MySQL dump 10.13 Distrib 8.0.22, for Linux (x86_64) +-- +-- Host: 127.0.0.1 Database: apiAutoTest +-- ------------------------------------------------------ +-- Server version 8.0.22 + +/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; +/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; +/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; +/*!50503 SET NAMES utf8mb4 */; +/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; +/*!40103 SET TIME_ZONE='+00:00' */; +/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; +/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; +/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; +/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; + +-- +-- Table structure for table `user` +-- + +DROP TABLE IF EXISTS `user`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!50503 SET character_set_client = utf8mb4 */; +CREATE TABLE `user` ( + `id` int NOT NULL AUTO_INCREMENT, + `username` varchar(255) DEFAULT NULL, + `password` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=993 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Dumping data for table `user` +-- + +LOCK TABLES `user` WRITE; +/*!40000 ALTER TABLE `user` DISABLE KEYS */; +INSERT INTO `user` VALUES (3,'所属','1231233'),(604,'薛淑珍','71255132'),(633,'方杰','11881865'),(881,'傅晨','45363849'),(992,'莫英','72334041'); +/*!40000 ALTER TABLE `user` ENABLE KEYS */; +UNLOCK TABLES; +/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; + +/*!40101 SET SQL_MODE=@OLD_SQL_MODE */; +/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; +/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; +/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; +/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; +/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; +/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; + +-- Dump completed on 2021-01-19 10:19:19 diff --git a/config/config.yaml b/config/config.yaml index 4cbb85e..70bf912 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -37,17 +37,23 @@ email: database: host: localhost port: 3306 - user: test + user: root # 不用''会被解析成int类型数据 password: '123456' db_name: test charset: utf8mb4 -# 响应存储/sql结果存储 -cache: - host: localhost - port: 6379 - db: 0 + # 数据库所在的服务器配置 + ssh_server: + port: 22 + username: root + password: '123456' + # 私有密钥文件路径 + private_key_file: + # 如果使用的docker容器部署mysql服务,需要传入mysql的容器id/name + mysql_container: mysql8 + # 数据库备份文件导出的本地路径, 需要保证存在该文件夹 + sql_data_file: backup_sqls/ diff --git a/data/case_data.xlsx b/data/case_data.xlsx index 405d8f3..d93f7c9 100644 Binary files a/data/case_data.xlsx and b/data/case_data.xlsx differ diff --git a/test/conftest.py b/test/conftest.py index 67d0990..41c9eae 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -11,12 +11,26 @@ import pytest +from tools.data_clearing import DataClearing from tools.db import DB from tools.read_file import ReadFile @pytest.fixture(scope="session") -def get_db(): +def data_clearing(): + """数据清洗""" + DataClearing.server_init() + # 1. 备份数据库 + DataClearing.backup_mysql() + yield + # 2. 恢复数据库 + DataClearing.recovery_mysql() + DataClearing.close_client() + + +# 若不需要数据清洗功能,请把get_db()入参拿掉 +@pytest.fixture(scope="session") +def get_db(data_clearing): """关于其作用域请移步查看官方文档""" try: db = DB() diff --git a/test/test_api.py b/test/test_api.py index 18a8e72..ab8b37d 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -18,12 +18,18 @@ from tools.data_process import DataProcess # reruns 重试次数 reruns_delay 次数之间的延时设置(单位:秒) # 失败重跑,会影响总测试时长,如不需要 将 @pytest.mark.flaky(reruns=3, reruns_delay=5) 注释即可 # @pytest.mark.flaky(reruns=2, reruns_delay=1) -def test_main(cases, get_db): - # 此处的cases入参来自与 conftest.py 文件中 cases函数,与直接使用 @pytest.mark.parametrize - # 有着差不多的效果 +# def test_main(cases, get_db): # 使用数据库功能(包含sql查询,数据备份,数据恢复) +# # 此处的cases入参来自与 conftest.py 文件中 cases函数,与直接使用 @pytest.mark.parametrize +# # 有着差不多的效果 +# # 发送请求 +# response, expect, sql = BaseRequest.send_request(cases) +# # 执行sql +# DataProcess.handle_sql(sql, get_db) +# # 断言操作 +# DataProcess.assert_result(response, expect) + +def test_main(cases): # 不使用数据库功能 # 发送请求 response, expect, sql = BaseRequest.send_request(cases) - # 执行sql - DataProcess.handle_sql(sql, get_db) # 断言操作 DataProcess.assert_result(response, expect) diff --git a/tools/data_clearing.py b/tools/data_clearing.py new file mode 100644 index 0000000..499d49a --- /dev/null +++ b/tools/data_clearing.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +@Time : 2021/1/19 11:44 +@Author : zy7y +@ProjectName : apiAutoTest +@File : data_clearing.py +@Software : PyCharm +@Github : https://github.com/zy7y +@Blog : https://www.cnblogs.com/zy7y +""" + +import os +from datetime import datetime +import paramiko +from tools.read_file import ReadFile +from tools import logger + + +class ServerTools: + def __init__(self, host: str, port: int = 22, username: str = "root", password: str = None, + private_key_file: str = None): + # 进行SSH连接 + self.trans = paramiko.Transport((host, port)) + self.host = host + if password is None: + self.trans.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(private_key_file)) + else: + self.trans.connect(username=username, password=password) + # 将sshclient的对象的transport指定为以上的trans + self.ssh = paramiko.SSHClient() + logger.success("SSH客户端创建成功.") + self.ssh._transport = self.trans + # 创建SFTP客户端 + self.ftp_client = paramiko.SFTPClient.from_transport(self.trans) + logger.success("SFTP客户端创建成功.") + + def execute_cmd(self, cmd: str): + """ + :param cmd: 服务器下对应的命令, 可以是list,或者str + """ + stdin, stdout, stderr = self.ssh.exec_command(cmd) + error = stderr.read().decode() + logger.info(f"输入命令: {cmd} -> 输出结果: {stdout.read().decode()}") + logger.error(f"异常信息: {error}") + return error + + def files_action(self, post: bool, local_path: str = os.getcwd(), remote_path: str = "/root"): + """ + :param post: 动作 为 True 就是上传, False就是下载 + :param local_path: 本地的文件路径, 默认当前脚本所在的工作目录 + :param remote_path: 服务器上的文件路径,默认在/root目录下 + """ + if post: # 上传文件 + self.ftp_client.put(localpath=local_path, remotepath=f"{remote_path}{os.path.split(local_path)[1]}") + logger.info(f"文件上传成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}") + else: # 下载文件 + file_path = local_path + os.path.split(remote_path)[1] + self.ftp_client.get(remotepath=remote_path, localpath=file_path) + logger.info(f"文件下载成功: {self.host}:{remote_path} -> {file_path}") + + def ssh_close(self): + """关闭连接""" + self.trans.close() + logger.info("已关闭SSH连接...") + + +class DataClearing: + settings = ReadFile.read_config('$.database') + server_settings = settings.get('ssh_server') + server = None + + # 导出的sql文件名称及后缀 + file_name = f"{settings.get('db_name')}_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.sql" + + @classmethod + def server_init(cls, settings=settings, server_settings=server_settings): + cls.server = ServerTools(host=settings.get('host'), port=server_settings.get('port'), + username=server_settings.get('username'), + password=server_settings.get('password'), + private_key_file=server_settings.get('private_key_file')) + # 新建backup_sql文件夹在服务器上,存放导出的sql文件 + cls.server.execute_cmd("mkdir backup_sql") + + @classmethod + def backup_mysql(cls): + """ + 备份数据库, 会分别备份在数据库所在服务器的/root/backup_sql/目录下, 与当前项目文件目录下的 backup_sqls + 每次备份生成一个数据库名_当前年_月_日T_时_分_秒, 支持linux 服务器上安装的mysql服务(本人未调试),以及linux中docker部署的mysql备份 + """ + if cls.server_settings.get('mysql_container') is None: + cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}" + else: + # 将mysql服务的容器中的指定数据库导出, 参考文章 https://www.cnblogs.com/wangsongbai/p/12666368.html + cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}" + cls.server.execute_cmd(cmd) + cls.server.files_action(0, f"{cls.server_settings.get('sql_data_file')}", f"/root/backup_sql/{cls.file_name}") + + @classmethod + def recovery_mysql(cls, sql_file: str = file_name, database: str = settings.get('db_name')): + + """ + 恢复数据库, 从服务器位置(/root/backup_sql/) 或者本地(../backup_sqls)上传, 传入的需要是.sql文件 + :param sql_file: .sql数据库备份文件, 默认就是导出的sql文件名称, 默认文件名称是导出的sql文件 + :param database: 恢复的数据库名称,默认是备份数据库(config.yaml中的db_name) + """ + result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}") + if "No such file or directory" in result: + # 本地上传 + cls.server.files_action(1, f"../backup_sqls/{sql_file}", "/root/backup_sql/") + cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}" + cls.server.execute_cmd(cmd) + + @classmethod + def close_client(cls): + cls.server.ssh_close()