diff --git a/backend/src/main/java/io/metersphere/api/dto/RunRequest.java b/backend/src/main/java/io/metersphere/api/dto/RunRequest.java index 8a6143289d..dd379f77c2 100644 --- a/backend/src/main/java/io/metersphere/api/dto/RunRequest.java +++ b/backend/src/main/java/io/metersphere/api/dto/RunRequest.java @@ -4,6 +4,8 @@ import io.metersphere.api.dto.automation.RunModeConfig; import lombok.Getter; import lombok.Setter; +import java.util.Map; + @Getter @Setter public class RunRequest { @@ -19,4 +21,6 @@ public class RunRequest { // 集成报告ID private String amassReport; private RunModeConfig config; + + private Map kafka; } diff --git a/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java b/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java index b0204397ad..1f60311b82 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java @@ -6,6 +6,7 @@ import io.metersphere.api.dto.RunRequest; import io.metersphere.api.dto.automation.ExecuteType; import io.metersphere.api.dto.automation.RunModeConfig; import io.metersphere.api.service.ApiScenarioReportService; +import io.metersphere.api.service.NodeKafkaService; import io.metersphere.base.domain.TestResource; import io.metersphere.base.domain.TestResourcePool; import io.metersphere.base.mapper.TestResourcePoolMapper; @@ -51,6 +52,8 @@ public class JMeterService { private TestResourcePoolMapper testResourcePoolMapper; @Resource private RestTemplate restTemplate; + @Resource + private NodeKafkaService nodeKafkaService; @PostConstruct public void init() { @@ -158,6 +161,8 @@ public class JMeterService { } runRequest.setUrl(platformUrl); runRequest.setRunMode(runMode); + runRequest.setKafka(nodeKafkaService.getKafka()); + // 如果是K8S调用 TestResourcePool pool = testResourcePoolMapper.selectByPrimaryKey(resourcePoolId); if (pool != null && pool.getApi() && pool.getType().equals(ResourcePoolTypeEnum.K8S.name())) { diff --git a/backend/src/main/java/io/metersphere/api/service/ApiAutomationService.java b/backend/src/main/java/io/metersphere/api/service/ApiAutomationService.java index 6febfcf1e6..1a8f692f00 100644 --- a/backend/src/main/java/io/metersphere/api/service/ApiAutomationService.java +++ b/backend/src/main/java/io/metersphere/api/service/ApiAutomationService.java @@ -1066,10 +1066,6 @@ public class ApiAutomationService { } else { List testResources = resourcePoolCalculation.getPools(request.getConfig().getResourcePoolId()); request.getConfig().setTestResources(testResources); - String status = nodeKafkaService.createKafkaProducer(request.getConfig()); - if ("ERROR".equals(status)) { - MSException.throwException("执行节点的kafka 启动失败,无法执行"); - } } } // 环境检查 @@ -1170,9 +1166,6 @@ public class ApiAutomationService { MessageCache.cache.put(serialReportId, counter); } } - - // 检查node的kafka - nodeKafkaService.createKafkaProducer(request.getConfig()); // 开始执行 if (executeQueue != null && executeQueue.size() > 0) { if (request.getConfig() != null && request.getConfig().getMode().equals(RunModeConstants.SERIAL.toString())) { diff --git a/backend/src/main/java/io/metersphere/api/service/NodeKafkaService.java b/backend/src/main/java/io/metersphere/api/service/NodeKafkaService.java index 1d7a8457f6..c783081983 100644 --- a/backend/src/main/java/io/metersphere/api/service/NodeKafkaService.java +++ b/backend/src/main/java/io/metersphere/api/service/NodeKafkaService.java @@ -1,18 +1,9 @@ package io.metersphere.api.service; -import com.alibaba.fastjson.JSON; -import io.metersphere.api.dto.JvmInfoDTO; -import io.metersphere.api.dto.automation.RunModeConfig; -import io.metersphere.base.domain.TestResource; -import io.metersphere.commons.utils.LogUtil; import io.metersphere.config.KafkaProperties; -import io.metersphere.dto.NodeDTO; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; -import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.util.HashMap; @@ -22,47 +13,14 @@ import java.util.Map; public class NodeKafkaService { @Resource private KafkaProperties kafkaProperties; - @Resource - private RestTemplate restTemplate; @Value("${spring.kafka.producer.properties.max.request.size}") private String maxRequestSize; - @Value("${spring.kafka.consumer.group-id}") - private String groupId; - - private static final String BASE_URL = "http://%s:%d"; - - public String createKafkaProducer(RunModeConfig config) { + public Map getKafka() { Map producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize); - - Map consumerProps = new HashMap<>(); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - - try { - for (JvmInfoDTO jvmInfoDTO : config.getTestResources()) { - TestResource testResource = jvmInfoDTO.getTestResource(); - String configuration = testResource.getConfiguration(); - NodeDTO node = JSON.parseObject(configuration, NodeDTO.class); - String nodeIp = node.getIp(); - Integer port = node.getPort(); - - String uri = String.format(BASE_URL + "/producer/create", nodeIp, port); - ResponseEntity result = restTemplate.postForEntity(uri, producerProps, String.class); - if (result.getBody() == null || !"SUCCESS".equals(result.getBody())) { - config.getTestResources().remove(jvmInfoDTO); - } - - //String cUri = String.format(BASE_URL + "/consumer/create", nodeIp, port); - //restTemplate.postForEntity(cUri, consumerProps, void.class); - } - } catch (Exception e) { - LogUtil.error(e.getMessage()); - return "ERROR"; - } - return "SUCCESS"; + return producerProps; } } diff --git a/backend/src/main/java/io/metersphere/track/service/TestPlanApiCaseService.java b/backend/src/main/java/io/metersphere/track/service/TestPlanApiCaseService.java index 6150ec428a..d61fde1d08 100644 --- a/backend/src/main/java/io/metersphere/track/service/TestPlanApiCaseService.java +++ b/backend/src/main/java/io/metersphere/track/service/TestPlanApiCaseService.java @@ -402,10 +402,6 @@ public class TestPlanApiCaseService { } else { List testResources = resourcePoolCalculation.getPools(request.getConfig().getResourcePoolId()); request.getConfig().setTestResources(testResources); - String status = nodeKafkaService.createKafkaProducer(request.getConfig()); - if ("ERROR".equals(status)) { - MSException.throwException("执行节点的kafka 启动失败,无法执行"); - } } } // 开始选择执行模式