feat(测试跟踪): 平台插件跨服务加载和卸载

This commit is contained in:
chenjianxing 2022-11-16 19:41:23 +08:00 committed by jianxing
parent 7b05cf730b
commit c0acc104de
14 changed files with 339 additions and 25 deletions

View File

@ -12,5 +12,9 @@ public interface KafkaTopicConstants {
String TEST_PLAN_REPORT_TOPIC = "TEST_PLAN_REPORT_TOPIC";
// 保存当前站点时检查MOCK环境
String CHECK_MOCK_ENV_TOPIC = "CHECK_MOCK_ENV_TOPIC";
// 上传插件后通知各服务从 minio 加载插件
String PLATFORM_PLUGIN_ADD = "PLATFORM_PLUGIN_ADD";
// 删除插件后卸载插件
String PLATFORM_PLUGIN_DELETED = "PLATFORM_PLUGIN_DELETED";
}

View File

@ -3,10 +3,14 @@ package io.metersphere.service;
import io.metersphere.base.domain.PluginExample;
import io.metersphere.base.domain.PluginWithBLOBs;
import io.metersphere.base.mapper.PluginMapper;
import io.metersphere.commons.constants.StorageConstants;
import io.metersphere.metadata.service.FileManagerService;
import io.metersphere.metadata.vo.FileRequest;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.InputStream;
import java.util.List;
@Service
@ -14,10 +18,26 @@ import java.util.List;
public class BasePluginService {
@Resource
private PluginMapper pluginMapper;
@Resource
FileManagerService fileManagerService;
public static final String DIR_PATH = "system/plugin";
public List<PluginWithBLOBs> getPlugins(String scenario) {
PluginExample example = new PluginExample();
example.createCriteria().andScenarioEqualTo(scenario);
return pluginMapper.selectByExampleWithBLOBs(example);
}
public InputStream getPluginResource(String pluginId, String resourceName) {
FileRequest request = new FileRequest();
request.setProjectId(DIR_PATH + "/" + pluginId);
request.setFileName(resourceName);
request.setStorage(StorageConstants.MINIO.name());
return fileManagerService.downloadFileAsStream(request);
}
public InputStream getPluginJar(String pluginId) {
PluginWithBLOBs plugin = pluginMapper.selectByPrimaryKey(pluginId);
return getPluginResource(pluginId, plugin.getSourceName());
}
}

View File

@ -1,9 +1,7 @@
package io.metersphere.remote.service;
import io.metersphere.commons.constants.StorageConstants;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.metadata.service.FileManagerService;
import io.metersphere.metadata.vo.FileRequest;
import io.metersphere.service.BasePluginService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@ -14,19 +12,11 @@ import java.io.OutputStream;
@Service
public class PlatformPluginService {
@Resource
FileManagerService fileManagerService;
public static final String DIR_PATH = "system/plugin";
BasePluginService basePluginService;
public void getPluginResource(String pluginId, String name, HttpServletResponse response) {
FileRequest request = new FileRequest();
request.setProjectId(DIR_PATH + "/" + pluginId);
request.setFileName(name);
request.setStorage(StorageConstants.MINIO.name());
InputStream inputStream = fileManagerService.downloadFileAsStream(request);
getImage(inputStream, response);
getImage(basePluginService.getPluginResource(pluginId, name), response);
}
public void getImage(InputStream in, HttpServletResponse response) {

View File

@ -39,4 +39,20 @@ public class KafkaTopicConfig {
.build();
}
@Bean
public NewTopic platformPluginAddTopic() {
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_ADD)
.partitions(partitions)
.replicas(replicas)
.build();
}
@Bean
public NewTopic platformPluginDeleteTopic() {
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED)
.partitions(partitions)
.replicas(replicas)
.build();
}
}

View File

@ -14,7 +14,7 @@ public class InitListener implements ApplicationRunner {
private PlatformPluginService platformPluginService;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
public void run(ApplicationArguments applicationArguments) {
platformPluginService.loadPlatFormPlugins();
}
}

View File

@ -0,0 +1,34 @@
package io.metersphere.listener;
import io.metersphere.commons.constants.KafkaTopicConstants;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.service.PlatformPluginService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class PlatformPluginListener {
public static final String ADD_CONSUME_ID = "system_setting_platform_plugin_add";
public static final String DELETE_CONSUME_ID = "system_setting_platform_plugin_delete";
@Resource
private PlatformPluginService platformPluginService;
@KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${spring.application.name}")
public void handlePluginAdd(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("system setting service consume platform_plugin add message, plugin id: " + pluginId);
platformPluginService.loadPlugin(pluginId);
}
@KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${spring.application.name}")
public void handlePluginDelete(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("system setting consume platform_plugin delete message, plugin id: " + pluginId);
platformPluginService.unload(pluginId);
}
}

View File

@ -5,6 +5,7 @@ import io.metersphere.api.PluginMetaInfo;
import io.metersphere.base.domain.PluginWithBLOBs;
import io.metersphere.base.domain.ServiceIntegration;
import io.metersphere.base.mapper.PluginMapper;
import io.metersphere.commons.constants.KafkaTopicConstants;
import io.metersphere.commons.constants.PluginScenario;
import io.metersphere.commons.utils.BeanUtils;
import io.metersphere.commons.utils.JSON;
@ -18,6 +19,7 @@ import io.metersphere.loader.PlatformPluginManager;
import io.metersphere.request.IntegrationRequest;
import io.metersphere.utils.PluginManagerUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
@ -43,6 +45,8 @@ public class PlatformPluginService {
private PluginMapper pluginMapper;
@Resource
private BaseIntegrationService baseIntegrationService;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
private PlatformPluginManager pluginManager;
@ -52,6 +56,7 @@ public class PlatformPluginService {
}
String id = UUID.randomUUID().toString();
PluginManagerUtil.uploadPlugin(id, file);
PluginManagerUtil.loadPlugin(id, pluginManager, file);
PluginMetaInfo pluginMetaInfo = pluginManager.getImplInstance(id, PluginMetaInfo.class);
@ -74,6 +79,8 @@ public class PlatformPluginService {
plugin.setCreateUserId(SessionUtils.getUserId());
plugin.setXpack(pluginMetaInfo.isXpack());
plugin.setScenario(PluginScenario.platform.name());
// 初始化项目默认节点
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_ADD, id);
return plugin;
}
@ -171,11 +178,13 @@ public class PlatformPluginService {
public void delete(String id) {
pluginMapper.deleteByPrimaryKey(id);
try {
// 删除文件
pluginManager.getClassLoader(id).getStorageStrategy().delete();
pluginManager.deletePlugin(id);
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, id);
} catch (IOException e) {
LogUtil.error(e);
}
this.unload(id);
}
public Platform getPlatFormInstance(String pluginId, Map IntegrationConfig) {
@ -237,4 +246,21 @@ public class PlatformPluginService {
.map(PluginMetaInfo::getKey)
.collect(Collectors.toList());
}
public void loadPlugin(String pluginId) {
if (pluginManager.getClassLoader(pluginId) == null) {
// 如果没有加载才加载
InputStream pluginJar = basePluginService.getPluginJar(pluginId);
PluginManagerUtil.loadPlugin(pluginId, pluginManager, pluginJar);
}
}
/**
* 卸载插件
* @param pluginId
*/
public void unload(String pluginId) {
pluginManager.deletePlugin(pluginId);
}
}

View File

@ -13,24 +13,54 @@ import java.util.List;
public class PluginManagerUtil {
public static void loadPlugin(String id, PluginManager pluginManager, MultipartFile file) {
if (pluginManager == null) {
pluginManager = new PluginManager();
}
MinioStorageStrategy minioStorageStrategy = new MinioStorageStrategy(id);
/**
* 上传到 minio
* @param id
* @param file
*/
public static void uploadPlugin(String id, MultipartFile file) {
try {
// 上传到 sso
minioStorageStrategy.store(file.getOriginalFilename(), file.getInputStream());
new MinioStorageStrategy(id).store(file.getOriginalFilename(), file.getInputStream());
} catch (IOException e) {
LogUtil.error("上传 jar 包失败: ", e);
MSException.throwException("上传 jar 包失败: " + e.getMessage());
}
}
/**
* 加载 jar
* 并设置存储策略
* @param id
* @param pluginManager
* @param file
*/
public static void loadPlugin(String id, PluginManager pluginManager, MultipartFile file) {
if (pluginManager == null) {
pluginManager = new PluginManager();
}
// 加载 jar
try {
pluginManager.loadJar(id, file.getInputStream(), minioStorageStrategy);
pluginManager.loadJar(id, file.getInputStream(), new MinioStorageStrategy(id));
} catch (IOException e) {
LogUtil.error("加载jar包失败: ", e);
MSException.throwException("加载jar包失败: " + e.getMessage());
}
}
/**
* 加载 jar
* 并设置存储策略
* @param id
* @param pluginManager
*/
public static void loadPlugin(String id, PluginManager pluginManager, InputStream inputStream) {
if (pluginManager == null) {
pluginManager = new PluginManager();
}
// 加载 jar
try {
pluginManager.loadJar(id, inputStream, new MinioStorageStrategy(id));
} catch (IOException e) {
LogUtil.error("加载jar包失败: ", e);
MSException.throwException("加载jar包失败: " + e.getMessage());

View File

@ -22,6 +22,11 @@
<artifactId>xpack-interface</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<artifactId>metersphere-platform-plugin-sdk</artifactId>
<groupId>io.metersphere</groupId>
<version>${platform-plugin-sdk.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -23,10 +23,13 @@ public class InitListener implements ApplicationRunner {
private TestReviewTestCaseService testReviewTestCaseService;
@Resource
private CustomFieldResourceCompatibleService customFieldResourceCompatibleService;
@Resource
private PlatformPluginService platformPluginService;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
this.initOnceOperate();
platformPluginService.loadPlatFormPlugins();
}
/**

View File

@ -0,0 +1,33 @@
package io.metersphere.listener;
import io.metersphere.commons.constants.KafkaTopicConstants;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.service.PlatformPluginService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class PlatformPluginListener {
public static final String ADD_CONSUME_ID = "test_track_platform_plugin_add";
public static final String DELETE_CONSUME_ID = "test_track_platform_plugin_delete";
@Resource
private PlatformPluginService platformPluginService;
@KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${spring.application.name}")
public void handlePluginAdd(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("track service consume platform_plugin add message, plugin id: " + pluginId);
platformPluginService.loadPlugin(pluginId);
}
@KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${spring.application.name}")
public void handlePluginDelete(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("track service consume platform_plugin delete message, plugin id: " + pluginId);
platformPluginService.unloadPlugin(pluginId);
}
}

View File

@ -0,0 +1,49 @@
package io.metersphere.service;
import io.metersphere.base.domain.PluginWithBLOBs;
import io.metersphere.commons.constants.PluginScenario;
import io.metersphere.loader.PlatformPluginManager;
import io.metersphere.utils.PluginManagerUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.InputStream;
import java.util.List;
@Service
@Transactional(rollbackFor = Exception.class)
public class PlatformPluginService {
@Resource
private BasePluginService basePluginService;
@Resource
private BaseIntegrationService baseIntegrationService;
private PlatformPluginManager pluginManager;
/**
* 查询所有平台插件并加载
*/
public void loadPlatFormPlugins() {
pluginManager = new PlatformPluginManager();
List<PluginWithBLOBs> plugins = basePluginService.getPlugins(PluginScenario.platform.name());
PluginManagerUtil.loadPlugins(pluginManager, plugins);
}
public void loadPlugin(String pluginId) {
if (pluginManager.getClassLoader(pluginId) == null) {
// 如果没有加载才加载
InputStream pluginJar = basePluginService.getPluginJar(pluginId);
PluginManagerUtil.loadPlugin(pluginId, pluginManager, pluginJar);
}
}
/**
* 卸载插件
* @param pluginId
*/
public void unloadPlugin(String pluginId) {
pluginManager.deletePlugin(pluginId);
}
}

View File

@ -0,0 +1,54 @@
package io.metersphere.service.plugin;
import im.metersphere.storage.StorageStrategy;
import io.metersphere.commons.constants.StorageConstants;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.metadata.service.FileManagerService;
import io.metersphere.metadata.vo.FileRequest;
import java.io.IOException;
import java.io.InputStream;
/**
* jar包静态资源存储策略存储在 Minio
*/
public class MinioStorageStrategy implements StorageStrategy {
private FileManagerService fileManagerService;
private String pluginId;
public static final String DIR_PATH = "system/plugin";
public MinioStorageStrategy(String pluginId) {
this.pluginId = pluginId;
fileManagerService = CommonBeanFactory.getBean(FileManagerService.class);
}
@Override
public String store(String name, InputStream in) throws IOException {
FileRequest request = getFileRequest(name);
return fileManagerService.upload(in.readAllBytes(), request);
}
@Override
public InputStream get(String name) {
FileRequest request = getFileRequest(name);
return fileManagerService.downloadFileAsStream(request);
}
@Override
public void delete() throws IOException {
FileRequest request = new FileRequest();
request.setProjectId(DIR_PATH + "/" + this.pluginId + "/");
request.setStorage(StorageConstants.MINIO.name());
fileManagerService.delete(request);
}
private FileRequest getFileRequest(String name) {
FileRequest request = new FileRequest();
request.setProjectId(DIR_PATH + "/" + this.pluginId);
request.setFileName(name);
request.setStorage(StorageConstants.MINIO.name());
return request;
}
}

View File

@ -0,0 +1,50 @@
package io.metersphere.utils;
import im.metersphere.loader.PluginManager;
import io.metersphere.base.domain.PluginWithBLOBs;
import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.service.plugin.MinioStorageStrategy;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
public class PluginManagerUtil {
/**
* 加载 jar
* 并设置存储策略
* @param id
* @param pluginManager
* @param inputStream
*/
public static void loadPlugin(String id, PluginManager pluginManager, InputStream inputStream) {
if (pluginManager == null) {
pluginManager = new PluginManager();
}
// 加载 jar
try {
pluginManager.loadJar(id, inputStream, new MinioStorageStrategy(id));
} catch (IOException e) {
LogUtil.error("加载jar包失败: ", e);
MSException.throwException("加载jar包失败: " + e.getMessage());
}
}
/**
* 加载插件
*/
public static void loadPlugins(PluginManager pluginManager, List<PluginWithBLOBs> plugins) {
plugins.forEach(plugin -> {
String id = plugin.getId();
MinioStorageStrategy minioStorageStrategy = new MinioStorageStrategy(id);
InputStream inputStream = minioStorageStrategy.get(plugin.getSourceName());
try {
pluginManager.loadJar(id, inputStream, minioStorageStrategy);
} catch (IOException e) {
LogUtil.error("初始化插件失败:", e);
}
});
}
}