diff --git a/backend/pom.xml b/backend/pom.xml
index db7a2e7c2a..8309c7fd26 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -68,7 +68,10 @@
org.springframework.boot
spring-boot-starter-mail
-
+
+ org.springframework.kafka
+ spring-kafka
+
org.projectlombok
lombok
diff --git a/backend/src/main/java/io/metersphere/commons/consumer/LoadTestConsumer.java b/backend/src/main/java/io/metersphere/commons/consumer/LoadTestConsumer.java
new file mode 100644
index 0000000000..f6587c7bea
--- /dev/null
+++ b/backend/src/main/java/io/metersphere/commons/consumer/LoadTestConsumer.java
@@ -0,0 +1,32 @@
+package io.metersphere.commons.consumer;
+
+import com.alibaba.fastjson.JSON;
+import io.metersphere.Application;
+import io.metersphere.base.domain.LoadTestReport;
+import io.metersphere.commons.utils.CommonBeanFactory;
+import io.metersphere.commons.utils.LogUtil;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.reflections8.Reflections;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+import java.util.Set;
+
+@Service
+public class LoadTestConsumer {
+ public static final String CONSUME_ID = "load-test-data";
+
+ @KafkaListener(id = CONSUME_ID, topics = "${kafka.test.topic}", groupId = "${spring.kafka.consumer.group-id}")
+ public void consume(ConsumerRecord, String> record) {
+ LoadTestReport loadTestReport = JSON.parseObject(record.value(), LoadTestReport.class);
+ Reflections reflections = new Reflections(Application.class);
+ Set> subTypes = reflections.getSubTypesOf(LoadTestFinishEvent.class);
+ subTypes.forEach(s -> {
+ try {
+ CommonBeanFactory.getBean(s).execute(loadTestReport);
+ } catch (Exception e) {
+ LogUtil.error(e);
+ }
+ });
+ }
+}
diff --git a/backend/src/main/java/io/metersphere/commons/consumer/LoadTestFinishEvent.java b/backend/src/main/java/io/metersphere/commons/consumer/LoadTestFinishEvent.java
new file mode 100644
index 0000000000..87eaa17d20
--- /dev/null
+++ b/backend/src/main/java/io/metersphere/commons/consumer/LoadTestFinishEvent.java
@@ -0,0 +1,7 @@
+package io.metersphere.commons.consumer;
+
+import io.metersphere.base.domain.LoadTestReport;
+
+public interface LoadTestFinishEvent {
+ void execute(LoadTestReport loadTestReport);
+}
diff --git a/backend/src/main/java/io/metersphere/performance/notice/PerformanceNoticeTask.java b/backend/src/main/java/io/metersphere/performance/notice/PerformanceNoticeEvent.java
similarity index 67%
rename from backend/src/main/java/io/metersphere/performance/notice/PerformanceNoticeTask.java
rename to backend/src/main/java/io/metersphere/performance/notice/PerformanceNoticeEvent.java
index db5be78c84..bdaf53a02f 100644
--- a/backend/src/main/java/io/metersphere/performance/notice/PerformanceNoticeTask.java
+++ b/backend/src/main/java/io/metersphere/performance/notice/PerformanceNoticeEvent.java
@@ -1,10 +1,9 @@
package io.metersphere.performance.notice;
-import io.metersphere.base.domain.LoadTestReportWithBLOBs;
-import io.metersphere.base.mapper.LoadTestReportMapper;
+import io.metersphere.base.domain.LoadTestReport;
import io.metersphere.commons.constants.NoticeConstants;
import io.metersphere.commons.constants.PerformanceTestStatus;
-import io.metersphere.commons.utils.LogUtil;
+import io.metersphere.commons.consumer.LoadTestFinishEvent;
import io.metersphere.dto.BaseSystemConfigDTO;
import io.metersphere.i18n.Translator;
import io.metersphere.notice.sender.NoticeModel;
@@ -13,53 +12,18 @@ import io.metersphere.service.SystemParameterService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
-import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
@Component
-public class PerformanceNoticeTask {
+public class PerformanceNoticeEvent implements LoadTestFinishEvent {
@Resource
private SystemParameterService systemParameterService;
@Resource
- private LoadTestReportMapper loadTestReportMapper;
- @Resource
private NoticeSendService noticeSendService;
- private final ExecutorService executorService = Executors.newFixedThreadPool(20);
-
- private boolean isRunning = false;
-
- @PreDestroy
- public void preDestroy() {
- isRunning = false;
- }
-
- public void registerNoticeTask(LoadTestReportWithBLOBs loadTestReport) {
- isRunning = true;
- executorService.submit(() -> {
- LogUtil.info("性能测试定时任务");
- while (isRunning) {
- LoadTestReportWithBLOBs loadTestReportFromDatabase = loadTestReportMapper.selectByPrimaryKey(loadTestReport.getId());
- if (StringUtils.equalsAny(loadTestReportFromDatabase.getStatus(),
- PerformanceTestStatus.Completed.name(), PerformanceTestStatus.Error.name())) {
- sendNotice(loadTestReportFromDatabase);
- return;
- }
- try {
- //查询定时任务是否关闭
- Thread.sleep(1000 * 10);// 检查 loadtest 的状态
- } catch (InterruptedException e) {
- LogUtil.error(e.getMessage(), e);
- }
- }
- });
- }
-
- public void sendNotice(LoadTestReportWithBLOBs loadTestReport) {
+ public void sendNotice(LoadTestReport loadTestReport) {
BaseSystemConfigDTO baseSystemConfigDTO = systemParameterService.getBaseInfo();
String url = baseSystemConfigDTO.getUrl() + "/#/performance/report/view/" + loadTestReport.getId();
String successContext = "";
@@ -102,4 +66,14 @@ public class PerformanceNoticeTask {
.build();
noticeSendService.send(loadTestReport.getTriggerMode(), noticeModel);
}
+
+ @Override
+ public void execute(LoadTestReport loadTestReport) {
+ if (StringUtils.equals(NoticeConstants.Mode.API, loadTestReport.getTriggerMode()) || StringUtils.equals(NoticeConstants.Mode.SCHEDULE, loadTestReport.getTriggerMode())) {
+ if (StringUtils.equalsAny(loadTestReport.getStatus(),
+ PerformanceTestStatus.Completed.name(), PerformanceTestStatus.Error.name())) {
+ sendNotice(loadTestReport);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/backend/src/main/java/io/metersphere/performance/service/PerformanceTestService.java b/backend/src/main/java/io/metersphere/performance/service/PerformanceTestService.java
index a3d6a4d9a6..8fe7fae7a3 100644
--- a/backend/src/main/java/io/metersphere/performance/service/PerformanceTestService.java
+++ b/backend/src/main/java/io/metersphere/performance/service/PerformanceTestService.java
@@ -21,7 +21,6 @@ import io.metersphere.i18n.Translator;
import io.metersphere.job.sechedule.PerformanceTestJob;
import io.metersphere.performance.engine.Engine;
import io.metersphere.performance.engine.EngineFactory;
-import io.metersphere.performance.notice.PerformanceNoticeTask;
import io.metersphere.service.FileService;
import io.metersphere.service.QuotaService;
import io.metersphere.service.ScheduleService;
@@ -75,8 +74,6 @@ public class PerformanceTestService {
@Resource
private TestCaseService testCaseService;
@Resource
- private PerformanceNoticeTask performanceNoticeTask;
- @Resource
private TestResourcePoolMapper testResourcePoolMapper;
public List list(QueryTestPlanRequest request) {
@@ -240,10 +237,6 @@ public class PerformanceTestService {
startEngine(loadTest, engine, request.getTriggerMode());
- LoadTestReportWithBLOBs loadTestReport = loadTestReportMapper.selectByPrimaryKey(engine.getReportId());
- if (StringUtils.equals(NoticeConstants.Mode.API, loadTestReport.getTriggerMode()) || StringUtils.equals(NoticeConstants.Mode.SCHEDULE, loadTestReport.getTriggerMode())) {
- performanceNoticeTask.registerNoticeTask(loadTestReport);
- }
return engine.getReportId();
}
diff --git a/backend/src/main/java/io/metersphere/security/ApiKeyFilter.java b/backend/src/main/java/io/metersphere/security/ApiKeyFilter.java
index 71821c7892..8eebd064f0 100644
--- a/backend/src/main/java/io/metersphere/security/ApiKeyFilter.java
+++ b/backend/src/main/java/io/metersphere/security/ApiKeyFilter.java
@@ -22,12 +22,12 @@ public class ApiKeyFilter extends AnonymousFilter {
if (LogUtil.getLogger().isDebugEnabled()) {
LogUtil.getLogger().debug("user auth: " + userId);
}
- SecurityUtils.getSubject().login(new MsUserToken(userId, ApiKeySessionHandler.random, "APIKEY"));
+ SecurityUtils.getSubject().login(new MsUserToken(userId, ApiKeySessionHandler.random, "LOCAL"));
}
} else {
if (ApiKeyHandler.isApiKeyCall(WebUtils.toHttp(request))) {
String userId = ApiKeyHandler.getUser(WebUtils.toHttp(request));
- SecurityUtils.getSubject().login(new MsUserToken(userId, ApiKeySessionHandler.random, "APIKEY"));
+ SecurityUtils.getSubject().login(new MsUserToken(userId, ApiKeySessionHandler.random, "LOCAL"));
}
}
diff --git a/backend/src/main/resources/application.properties b/backend/src/main/resources/application.properties
index 5a57cf4026..b1a8bff38a 100644
--- a/backend/src/main/resources/application.properties
+++ b/backend/src/main/resources/application.properties
@@ -12,7 +12,9 @@ spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.max-lifetime=1800000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
-
+#kafka
+spring.kafka.bootstrap-servers=${kafka.bootstrap-servers}
+spring.kafka.consumer.group-id=metersphere_group_id
# mybatis
mybatis.configuration.cache-enabled=true
mybatis.configuration.lazy-loading-enabled=false