From 0a9823f962d0564284512f9ff3509dadb5a5075a Mon Sep 17 00:00:00 2001 From: chenjianxing Date: Fri, 17 Mar 2023 14:35:56 +0800 Subject: [PATCH] =?UTF-8?q?refactor(=E6=B5=8B=E8=AF=95=E8=B7=9F=E8=B8=AA):?= =?UTF-8?q?=20=E5=90=88=E5=B9=B6=E5=B9=B3=E5=8F=B0=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E7=9A=84=E5=A4=9A=E4=B8=AAtopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/KafkaTopicConstants.java | 7 ++-- .../metersphere/config/KafkaTopicConfig.java | 11 +----- .../listener/PlatformPluginListener.java | 31 ++++++++-------- .../service/PlatformPluginService.java | 4 +-- .../listener/PlatformPluginListener.java | 35 ++++++++++--------- 5 files changed, 41 insertions(+), 47 deletions(-) diff --git a/framework/sdk-parent/sdk/src/main/java/io/metersphere/commons/constants/KafkaTopicConstants.java b/framework/sdk-parent/sdk/src/main/java/io/metersphere/commons/constants/KafkaTopicConstants.java index 11030aaaa8..6df3f550c3 100644 --- a/framework/sdk-parent/sdk/src/main/java/io/metersphere/commons/constants/KafkaTopicConstants.java +++ b/framework/sdk-parent/sdk/src/main/java/io/metersphere/commons/constants/KafkaTopicConstants.java @@ -12,9 +12,6 @@ public interface KafkaTopicConstants { String TEST_PLAN_REPORT_TOPIC = "TEST_PLAN_REPORT_TOPIC"; // 保存当前站点时检查MOCK环境 String CHECK_MOCK_ENV_TOPIC = "CHECK_MOCK_ENV_TOPIC"; - // 上传插件后,通知各服务从 minio 加载插件 - String PLATFORM_PLUGIN_ADD = "PLATFORM_PLUGIN_ADD"; - // 删除插件后卸载插件 - String PLATFORM_PLUGIN_DELETED = "PLATFORM_PLUGIN_DELETED"; - + // 上传插件后,通知各服务从 minio 加载插件, 删除插件后卸载插件 + String PLATFORM_PLUGIN = "PLATFORM_PLUGIN"; } diff --git a/system-setting/backend/src/main/java/io/metersphere/config/KafkaTopicConfig.java b/system-setting/backend/src/main/java/io/metersphere/config/KafkaTopicConfig.java index b811d3620e..55f01bc271 100644 --- a/system-setting/backend/src/main/java/io/metersphere/config/KafkaTopicConfig.java +++ b/system-setting/backend/src/main/java/io/metersphere/config/KafkaTopicConfig.java @@ -39,18 +39,9 @@ public class KafkaTopicConfig { .build(); } - @Bean public NewTopic platformPluginAddTopic() { - return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_ADD) - .partitions(partitions) - .replicas(replicas) - .build(); - } - - @Bean - public NewTopic platformPluginDeleteTopic() { - return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED) + return TopicBuilder.name(KafkaTopicConstants.PLATFORM_PLUGIN) .partitions(partitions) .replicas(replicas) .build(); diff --git a/system-setting/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java b/system-setting/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java index f8aba40781..fc7e285f52 100644 --- a/system-setting/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java +++ b/system-setting/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java @@ -12,24 +12,27 @@ import jakarta.annotation.Resource; @Component public class PlatformPluginListener { - public static final String ADD_CONSUME_ID = "system_setting_platform_plugin_add"; - public static final String DELETE_CONSUME_ID = "system_setting_platform_plugin_delete"; + public static final String TEST_TRACK_PLATFORM_PLUGIN = "test_track_platform_plugin"; @Resource private PlatformPluginService platformPluginService; // groupId 必须是每个实例唯一 - @KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${eureka.instance.instance-id}") - public void handlePluginAdd(ConsumerRecord record) { - String pluginId = record.value(); - LogUtil.info("system setting service consume platform_plugin add message, plugin id: " + pluginId); - platformPluginService.loadPlugin(pluginId); - } - - @KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${eureka.instance.instance-id}") - public void handlePluginDelete(ConsumerRecord record) { - String pluginId = record.value(); - LogUtil.info("system setting consume platform_plugin delete message, plugin id: " + pluginId); - platformPluginService.unload(pluginId); + @KafkaListener(id = TEST_TRACK_PLATFORM_PLUGIN, topics = KafkaTopicConstants.PLATFORM_PLUGIN, groupId = TEST_TRACK_PLATFORM_PLUGIN + "_" + "#{T(java.util.UUID).randomUUID()})") + public void handlePluginChange(ConsumerRecord record) { + LogUtil.info("track service consume platform_plugin add message, plugin id: " + record); + String[] info = record.value().split(":"); + String operate = info[0]; + String pluginId = info[1]; + switch (operate) { + case "ADD": + platformPluginService.loadPlugin(pluginId); + break; + case "DELETE": + platformPluginService.unload(pluginId); + break; + default: + break; + } } } diff --git a/system-setting/backend/src/main/java/io/metersphere/service/PlatformPluginService.java b/system-setting/backend/src/main/java/io/metersphere/service/PlatformPluginService.java index cb0d9f9add..8b132159dd 100644 --- a/system-setting/backend/src/main/java/io/metersphere/service/PlatformPluginService.java +++ b/system-setting/backend/src/main/java/io/metersphere/service/PlatformPluginService.java @@ -114,7 +114,7 @@ public class PlatformPluginService { public void notifiedPlatformPluginAdd(String pluginId) { // 初始化项目默认节点 - kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_ADD, pluginId); + kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN, "ADD:" + pluginId); } /** @@ -220,7 +220,7 @@ public class PlatformPluginService { try { // 删除文件 getPluginManager().getClassLoader(id).getStorageStrategy().delete(); - kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, id); + kafkaTemplate.send(KafkaTopicConstants.PLATFORM_PLUGIN, "DELETE:" + id); } catch (IOException e) { LogUtil.error(e); } diff --git a/test-track/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java b/test-track/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java index 7b57ae0d3c..103ad9429e 100644 --- a/test-track/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java +++ b/test-track/backend/src/main/java/io/metersphere/listener/PlatformPluginListener.java @@ -3,32 +3,35 @@ package io.metersphere.listener; import io.metersphere.commons.constants.KafkaTopicConstants; import io.metersphere.commons.utils.LogUtil; import io.metersphere.service.PlatformPluginService; +import jakarta.annotation.Resource; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import jakarta.annotation.Resource; - @Component public class PlatformPluginListener { - public static final String ADD_CONSUME_ID = "test_track_platform_plugin_add"; - public static final String DELETE_CONSUME_ID = "test_track_platform_plugin_delete"; + + public static final String TEST_TRACK_PLATFORM_PLUGIN = "test_track_platform_plugin"; @Resource private PlatformPluginService platformPluginService; // groupId 必须是每个实例唯一 - @KafkaListener(id = ADD_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_ADD, groupId = "${eureka.instance.instance-id}") - public void handlePluginAdd(ConsumerRecord record) { - String pluginId = record.value(); - LogUtil.info("track service consume platform_plugin add message, plugin id: " + pluginId); - platformPluginService.loadPlugin(pluginId); - } - - @KafkaListener(id = DELETE_CONSUME_ID, topics = KafkaTopicConstants.PLATFORM_PLUGIN_DELETED, groupId = "${eureka.instance.instance-id}") - public void handlePluginDelete(ConsumerRecord record) { - String pluginId = record.value(); - LogUtil.info("track service consume platform_plugin delete message, plugin id: " + pluginId); - platformPluginService.unloadPlugin(pluginId); + @KafkaListener(id = TEST_TRACK_PLATFORM_PLUGIN, topics = KafkaTopicConstants.PLATFORM_PLUGIN, groupId = TEST_TRACK_PLATFORM_PLUGIN + "_" + "#{T(java.util.UUID).randomUUID()})") + public void handlePluginChange(ConsumerRecord record) { + LogUtil.info("track service consume platform_plugin add message, plugin id: " + record); + String[] info = record.value().split(":"); + String operate = info[0]; + String pluginId = info[1]; + switch (operate) { + case "ADD": + platformPluginService.loadPlugin(pluginId); + break; + case "DELETE": + platformPluginService.unloadPlugin(pluginId); + break; + default: + break; + } } }