refactor(测试跟踪): 合并平台插件的多个topic

This commit is contained in:
chenjianxing 2023-03-17 14:35:56 +08:00 committed by jianxing
parent 3203a60dda
commit 0a9823f962
5 changed files with 41 additions and 47 deletions

View File

@ -12,9 +12,6 @@ public interface KafkaTopicConstants {
String TEST_PLAN_REPORT_TOPIC = "TEST_PLAN_REPORT_TOPIC"; String TEST_PLAN_REPORT_TOPIC = "TEST_PLAN_REPORT_TOPIC";
// 保存当前站点时检查MOCK环境 // 保存当前站点时检查MOCK环境
String CHECK_MOCK_ENV_TOPIC = "CHECK_MOCK_ENV_TOPIC"; String CHECK_MOCK_ENV_TOPIC = "CHECK_MOCK_ENV_TOPIC";
// 上传插件后通知各服务从 minio 加载插件 // 上传插件后通知各服务从 minio 加载插件, 删除插件后卸载插件
String PLATFORM_PLUGIN_ADD = "PLATFORM_PLUGIN_ADD"; String PLATFORM_PLUGIN = "PLATFORM_PLUGIN";
// 删除插件后卸载插件
String PLATFORM_PLUGIN_DELETED = "PLATFORM_PLUGIN_DELETED";
} }

View File

@ -39,18 +39,9 @@ public class KafkaTopicConfig {
.build(); .build();
} }
@Bean @Bean
public NewTopic platformPluginAddTopic() { public NewTopic platformPluginAddTopic() {
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_ADD) return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN)
.partitions(partitions)
.replicas(replicas)
.build();
}
@Bean
public NewTopic platformPluginDeleteTopic() {
return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED)
.partitions(partitions) .partitions(partitions)
.replicas(replicas) .replicas(replicas)
.build(); .build();

View File

@ -12,24 +12,27 @@ import jakarta.annotation.Resource;
@Component @Component
public class PlatformPluginListener { public class PlatformPluginListener {
public static final String ADD_CONSUME_ID = "system_setting_platform_plugin_add"; public static final String TEST_TRACK_PLATFORM_PLUGIN = "test_track_platform_plugin";
public static final String DELETE_CONSUME_ID = "system_setting_platform_plugin_delete";
@Resource @Resource
private PlatformPluginService platformPluginService; private PlatformPluginService platformPluginService;
// groupId 必须是每个实例唯一 // groupId 必须是每个实例唯一
@KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${eureka.instance.instance-id}") @KafkaListener(id = TEST_TRACK_PLATFORM_PLUGIN, topics = KafkaTopicConstants.PLATFORM_PLUGIN, groupId = TEST_TRACK_PLATFORM_PLUGIN + "_" + "#{T(java.util.UUID).randomUUID()})")
public void handlePluginAdd(ConsumerRecord<?, String> record) { public void handlePluginChange(ConsumerRecord<?, String> record) {
String pluginId = record.value(); LogUtil.info("track service consume platform_plugin add message, plugin id: " + record);
LogUtil.info("system setting service consume platform_plugin add message, plugin id: " + pluginId); String[] info = record.value().split(":");
String operate = info[0];
String pluginId = info[1];
switch (operate) {
case "ADD":
platformPluginService.loadPlugin(pluginId); platformPluginService.loadPlugin(pluginId);
} break;
case "DELETE":
@KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${eureka.instance.instance-id}")
public void handlePluginDelete(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("system setting consume platform_plugin delete message, plugin id: " + pluginId);
platformPluginService.unload(pluginId); platformPluginService.unload(pluginId);
break;
default:
break;
}
} }
} }

View File

@ -114,7 +114,7 @@ public class PlatformPluginService {
public void notifiedPlatformPluginAdd(String pluginId) { public void notifiedPlatformPluginAdd(String pluginId) {
// 初始化项目默认节点 // 初始化项目默认节点
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_ADD, pluginId); kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN, "ADD:" + pluginId);
} }
/** /**
@ -220,7 +220,7 @@ public class PlatformPluginService {
try { try {
// 删除文件 // 删除文件
getPluginManager().getClassLoader(id).getStorageStrategy().delete(); getPluginManager().getClassLoader(id).getStorageStrategy().delete();
kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, id); kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN, "DELETE:" + id);
} catch (IOException e) { } catch (IOException e) {
LogUtil.error(e); LogUtil.error(e);
} }

View File

@ -3,32 +3,35 @@ package io.metersphere.listener;
import io.metersphere.commons.constants.KafkaTopicConstants; import io.metersphere.commons.constants.KafkaTopicConstants;
import io.metersphere.commons.utils.LogUtil; import io.metersphere.commons.utils.LogUtil;
import io.metersphere.service.PlatformPluginService; import io.metersphere.service.PlatformPluginService;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
@Component @Component
public class PlatformPluginListener { public class PlatformPluginListener {
public static final String ADD_CONSUME_ID = "test_track_platform_plugin_add";
public static final String DELETE_CONSUME_ID = "test_track_platform_plugin_delete"; public static final String TEST_TRACK_PLATFORM_PLUGIN = "test_track_platform_plugin";
@Resource @Resource
private PlatformPluginService platformPluginService; private PlatformPluginService platformPluginService;
// groupId 必须是每个实例唯一 // groupId 必须是每个实例唯一
@KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${eureka.instance.instance-id}") @KafkaListener(id = TEST_TRACK_PLATFORM_PLUGIN, topics = KafkaTopicConstants.PLATFORM_PLUGIN, groupId = TEST_TRACK_PLATFORM_PLUGIN + "_" + "#{T(java.util.UUID).randomUUID()})")
public void handlePluginAdd(ConsumerRecord<?, String> record) { public void handlePluginChange(ConsumerRecord<?, String> record) {
String pluginId = record.value(); LogUtil.info("track service consume platform_plugin add message, plugin id: " + record);
LogUtil.info("track service consume platform_plugin add message, plugin id: " + pluginId); String[] info = record.value().split(":");
String operate = info[0];
String pluginId = info[1];
switch (operate) {
case "ADD":
platformPluginService.loadPlugin(pluginId); platformPluginService.loadPlugin(pluginId);
} break;
case "DELETE":
@KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${eureka.instance.instance-id}")
public void handlePluginDelete(ConsumerRecord<?, String> record) {
String pluginId = record.value();
LogUtil.info("track service consume platform_plugin delete message, plugin id: " + pluginId);
platformPluginService.unloadPlugin(pluginId); platformPluginService.unloadPlugin(pluginId);
break;
default:
break;
}
} }
} }