refactor(测试跟踪): 合并平台插件的多个topic

This commit is contained in:
chenjianxing 2023-03-17 14:35:56 +08:00 committed by jianxing
parent 7b373caef3
commit e3921882cf
5 changed files with 41 additions and 47 deletions

View File

@ -12,9 +12,6 @@ public interface KafkaTopicConstants {
String TEST_PLAN_REPORT_TOPIC = "TEST_PLAN_REPORT_TOPIC";
// 保存当前站点时检查MOCK环境
String CHECK_MOCK_ENV_TOPIC = "CHECK_MOCK_ENV_TOPIC";
// 上传插件后通知各服务从 minio 加载插件
String PLATFORM_PLUGIN_ADD = "PLATFORM_PLUGIN_ADD";
// 删除插件后卸载插件
String PLATFORM_PLUGIN_DELETED = "PLATFORM_PLUGIN_DELETED";
// 上传插件后通知各服务从 minio 加载插件, 删除插件后卸载插件
String PLATFORM_PLUGIN = "PLATFORM_PLUGIN";
}

View File

@ -39,18 +39,9 @@ public class KafkaTopicConfig {
.build();
}
@Bean
public NewTopic platformPluginAddTopic() {
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_ADD)
.partitions(partitions)
.replicas(replicas)
.build();
}
@Bean
public NewTopic platformPluginDeleteTopic() {
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED)
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN)
.partitions(partitions)
.replicas(replicas)
.build();

View File

@ -12,24 +12,27 @@ import jakarta.annotation.Resource;
@Component
public class PlatformPluginListener {
public static final String ADD_CONSUME_ID = "system_setting_platform_plugin_add";
public static final String DELETE_CONSUME_ID = "system_setting_platform_plugin_delete";
public static final String TEST_TRACK_PLATFORM_PLUGIN = "test_track_platform_plugin";
@Resource
private PlatformPluginService platformPluginService;
// groupId 必须是每个实例唯一
@KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${eureka.instance.instance-id}")
public void handlePluginAdd(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("system setting service consume platform_plugin add message, plugin id: " + pluginId);
platformPluginService.loadPlugin(pluginId);
}
@KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${eureka.instance.instance-id}")
public void handlePluginDelete(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("system setting consume platform_plugin delete message, plugin id: " + pluginId);
platformPluginService.unload(pluginId);
@KafkaListener(id = TEST_TRACK_PLATFORM_PLUGIN, topics = KafkaTopicConstants.PLATFORM_PLUGIN, groupId = TEST_TRACK_PLATFORM_PLUGIN + "_" + "#{T(java.util.UUID).randomUUID()})")
public void handlePluginChange(ConsumerRecord<?, String> record) {
LogUtil.info("track service consume platform_plugin add message, plugin id: " + record);
String[] info = record.value().split(":");
String operate = info[0];
String pluginId = info[1];
switch (operate) {
case "ADD":
platformPluginService.loadPlugin(pluginId);
break;
case "DELETE":
platformPluginService.unload(pluginId);
break;
default:
break;
}
}
}

View File

@ -114,7 +114,7 @@ public class PlatformPluginService {
public void notifiedPlatformPluginAdd(String pluginId) {
// 初始化项目默认节点
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_ADD, pluginId);
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN, "ADD:" + pluginId);
}
/**
@ -220,7 +220,7 @@ public class PlatformPluginService {
try {
// 删除文件
getPluginManager().getClassLoader(id).getStorageStrategy().delete();
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, id);
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN, "DELETE:" + id);
} catch (IOException e) {
LogUtil.error(e);
}

View File

@ -3,32 +3,35 @@ package io.metersphere.listener;
import io.metersphere.commons.constants.KafkaTopicConstants;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.service.PlatformPluginService;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
@Component
public class PlatformPluginListener {
public static final String ADD_CONSUME_ID = "test_track_platform_plugin_add";
public static final String DELETE_CONSUME_ID = "test_track_platform_plugin_delete";
public static final String TEST_TRACK_PLATFORM_PLUGIN = "test_track_platform_plugin";
@Resource
private PlatformPluginService platformPluginService;
// groupId 必须是每个实例唯一
@KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${eureka.instance.instance-id}")
public void handlePluginAdd(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("track service consume platform_plugin add message, plugin id: " + pluginId);
platformPluginService.loadPlugin(pluginId);
}
@KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${eureka.instance.instance-id}")
public void handlePluginDelete(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("track service consume platform_plugin delete message, plugin id: " + pluginId);
platformPluginService.unloadPlugin(pluginId);
@KafkaListener(id = TEST_TRACK_PLATFORM_PLUGIN, topics = KafkaTopicConstants.PLATFORM_PLUGIN, groupId = TEST_TRACK_PLATFORM_PLUGIN + "_" + "#{T(java.util.UUID).randomUUID()})")
public void handlePluginChange(ConsumerRecord<?, String> record) {
LogUtil.info("track service consume platform_plugin add message, plugin id: " + record);
String[] info = record.value().split(":");
String operate = info[0];
String pluginId = info[1];
switch (operate) {
case "ADD":
platformPluginService.loadPlugin(pluginId);
break;
case "DELETE":
platformPluginService.unloadPlugin(pluginId);
break;
default:
break;
}
}
}