fix(接口测试): 修复k8s执行kafka收不到消息问题

This commit is contained in:
fit2-zhao 2021-10-28 13:44:24 +08:00 committed by fit2-zhao
parent 42d99ed2ef
commit b80fe8f07c
5 changed files with 11 additions and 55 deletions

View File

@ -4,6 +4,8 @@ import io.metersphere.api.dto.automation.RunModeConfig;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
@Getter
@Setter
public class RunRequest {
@ -19,4 +21,6 @@ public class RunRequest {
// 集成报告ID
private String amassReport;
private RunModeConfig config;
private Map<String, Object> kafka;
}

View File

@ -6,6 +6,7 @@ import io.metersphere.api.dto.RunRequest;
import io.metersphere.api.dto.automation.ExecuteType;
import io.metersphere.api.dto.automation.RunModeConfig;
import io.metersphere.api.service.ApiScenarioReportService;
import io.metersphere.api.service.NodeKafkaService;
import io.metersphere.base.domain.TestResource;
import io.metersphere.base.domain.TestResourcePool;
import io.metersphere.base.mapper.TestResourcePoolMapper;
@ -51,6 +52,8 @@ public class JMeterService {
private TestResourcePoolMapper testResourcePoolMapper;
@Resource
private RestTemplate restTemplate;
@Resource
private NodeKafkaService nodeKafkaService;
@PostConstruct
public void init() {
@ -158,6 +161,8 @@ public class JMeterService {
}
runRequest.setUrl(platformUrl);
runRequest.setRunMode(runMode);
runRequest.setKafka(nodeKafkaService.getKafka());
// 如果是K8S调用
TestResourcePool pool = testResourcePoolMapper.selectByPrimaryKey(resourcePoolId);
if (pool != null && pool.getApi() && pool.getType().equals(ResourcePoolTypeEnum.K8S.name())) {

View File

@ -1066,10 +1066,6 @@ public class ApiAutomationService {
} else {
List<JvmInfoDTO> testResources = resourcePoolCalculation.getPools(request.getConfig().getResourcePoolId());
request.getConfig().setTestResources(testResources);
String status = nodeKafkaService.createKafkaProducer(request.getConfig());
if ("ERROR".equals(status)) {
MSException.throwException("执行节点的kafka 启动失败,无法执行");
}
}
}
// 环境检查
@ -1170,9 +1166,6 @@ public class ApiAutomationService {
MessageCache.cache.put(serialReportId, counter);
}
}
// 检查node的kafka
nodeKafkaService.createKafkaProducer(request.getConfig());
// 开始执行
if (executeQueue != null && executeQueue.size() > 0) {
if (request.getConfig() != null && request.getConfig().getMode().equals(RunModeConstants.SERIAL.toString())) {

View File

@ -1,18 +1,9 @@
package io.metersphere.api.service;
import com.alibaba.fastjson.JSON;
import io.metersphere.api.dto.JvmInfoDTO;
import io.metersphere.api.dto.automation.RunModeConfig;
import io.metersphere.base.domain.TestResource;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.config.KafkaProperties;
import io.metersphere.dto.NodeDTO;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.HashMap;
@ -22,47 +13,14 @@ import java.util.Map;
public class NodeKafkaService {
@Resource
private KafkaProperties kafkaProperties;
@Resource
private RestTemplate restTemplate;
@Value("${spring.kafka.producer.properties.max.request.size}")
private String maxRequestSize;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
private static final String BASE_URL = "http://%s:%d";
public String createKafkaProducer(RunModeConfig config) {
public Map<String, Object> getKafka() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try {
for (JvmInfoDTO jvmInfoDTO : config.getTestResources()) {
TestResource testResource = jvmInfoDTO.getTestResource();
String configuration = testResource.getConfiguration();
NodeDTO node = JSON.parseObject(configuration, NodeDTO.class);
String nodeIp = node.getIp();
Integer port = node.getPort();
String uri = String.format(BASE_URL + "/producer/create", nodeIp, port);
ResponseEntity<String> result = restTemplate.postForEntity(uri, producerProps, String.class);
if (result.getBody() == null || !"SUCCESS".equals(result.getBody())) {
config.getTestResources().remove(jvmInfoDTO);
}
//String cUri = String.format(BASE_URL + "/consumer/create", nodeIp, port);
//restTemplate.postForEntity(cUri, consumerProps, void.class);
}
} catch (Exception e) {
LogUtil.error(e.getMessage());
return "ERROR";
}
return "SUCCESS";
return producerProps;
}
}

View File

@ -402,10 +402,6 @@ public class TestPlanApiCaseService {
} else {
List<JvmInfoDTO> testResources = resourcePoolCalculation.getPools(request.getConfig().getResourcePoolId());
request.getConfig().setTestResources(testResources);
String status = nodeKafkaService.createKafkaProducer(request.getConfig());
if ("ERROR".equals(status)) {
MSException.throwException("执行节点的kafka 启动失败,无法执行");
}
}
}
// 开始选择执行模式