判题调度逻辑2.0解决并发问题
This commit is contained in:
parent
ddfaa27d06
commit
d89bfcfc08
|
@ -0,0 +1,21 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2021 Himit_ZH
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -2,14 +2,15 @@
|
|||
|
||||
基于前后端分离,分布式架构的在线测评平台(hoj)
|
||||
|
||||
在线Demo:[http://oj.hcode.top](http://oj.hcode.top)
|
||||
在线Demo:[http://www.hcode.top](http://www.hcode.top)
|
||||
|
||||
> 上线日记
|
||||
|
||||
| 时间 | 内容 | 更新者 |
|
||||
| ---------- | ------------ | -------- |
|
||||
| ---------- | ----------------------- | -------- |
|
||||
| 2020-10-26 | 正式开发 | Himit_ZH |
|
||||
| 2021-04-10 | 首次上线测试 | Himit_ZH |
|
||||
| 2021-04-15 | 判题调度2.0解决并发问题 | Himit_ZH |
|
||||
|
||||
> 简略介绍
|
||||
|
||||
|
|
|
@ -69,10 +69,6 @@
|
|||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-openfeign</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
package top.hcode.hoj.common.exception;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import top.hcode.hoj.common.result.CommonResult;
|
||||
import top.hcode.hoj.judge.remote.RemoteJudgeDispatcher;
|
||||
import top.hcode.hoj.judge.self.JudgeDispatcher;
|
||||
import top.hcode.hoj.pojo.entity.CompileSpj;
|
||||
import top.hcode.hoj.pojo.entity.Judge;
|
||||
import top.hcode.hoj.pojo.entity.ToJudge;
|
||||
import top.hcode.hoj.service.ToJudgeService;
|
||||
import top.hcode.hoj.service.impl.JudgeServiceImpl;
|
||||
import top.hcode.hoj.utils.Constants;
|
||||
import top.hcode.hoj.utils.RedisUtils;
|
||||
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/**
|
||||
* @Author: Himit_ZH
|
||||
* @Date: 2020/10/30 10:21
|
||||
* @Description: 调用判题服务器的方法的容错机制,调用失败会走到以下方法进行执行
|
||||
*/
|
||||
@Component
|
||||
public class CloudHandler implements ToJudgeService {
|
||||
|
||||
@Autowired
|
||||
private JudgeDispatcher judgeDispatcher;
|
||||
|
||||
@Autowired
|
||||
private RemoteJudgeDispatcher remoteJudgeDispatcher;
|
||||
|
||||
@Autowired
|
||||
private RedisUtils redisUtils;
|
||||
|
||||
@Autowired
|
||||
private JudgeServiceImpl judgeService;
|
||||
|
||||
|
||||
// 调度判题服务器失败,可能是判题服务器有故障,或者全部达到判题最大数,那么将该提交重新进入等待队列
|
||||
@Override
|
||||
public CommonResult submitProblemJudge(ToJudge toJudge) {
|
||||
if (toJudge.getTryAgainNum() == 30) {
|
||||
Judge judge = toJudge.getJudge();
|
||||
judge.setStatus(Constants.Judge.STATUS_SUBMITTED_FAILED.getStatus());
|
||||
judge.setErrorMessage("Failed to connect the judgeServer. Please resubmit this submission again!");
|
||||
judgeService.updateById(judge);
|
||||
} else {
|
||||
// 线程沉睡1秒,再将任务重新发布,避免过快问题,同时判题服务过多,导致的失败
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Judge judge = toJudge.getJudge();
|
||||
judgeDispatcher.sendTask(judge.getSubmitId(), judge.getPid(), toJudge.getToken(),
|
||||
judge.getCid() != 0, toJudge.getTryAgainNum() + 1);
|
||||
}
|
||||
return CommonResult.errorResponse("判题服务器繁忙或出错,提交进入重判队列,请等待管理员处理!", CommonResult.STATUS_ERROR);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommonResult compileSpj(CompileSpj compileSpj) {
|
||||
return CommonResult.errorResponse("没有可用的判题服务,请重新尝试!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommonResult remoteJudge(ToJudge toJudge) {
|
||||
// 将使用的账号放回对应列表
|
||||
JSONObject account = new JSONObject();
|
||||
account.set("username", toJudge.getUsername());
|
||||
account.set("password", toJudge.getPassword());
|
||||
redisUtils.llPush(Constants.Judge.getListNameByOJName(toJudge.getRemoteJudge().split("-")[0]), JSONUtil.toJsonStr(account));
|
||||
|
||||
if (toJudge.getTryAgainNum() == 30) {
|
||||
Judge judge = toJudge.getJudge();
|
||||
judge.setStatus(Constants.Judge.STATUS_SUBMITTED_FAILED.getStatus());
|
||||
judge.setErrorMessage("Failed to connect the judgeServer. Please resubmit this submission again!");
|
||||
judgeService.updateById(judge);
|
||||
} else {
|
||||
|
||||
// 线程沉睡一秒,再将任务重新发布,避免过快问题,同时判题服务过多,导致的失败
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
Judge judge = toJudge.getJudge();
|
||||
remoteJudgeDispatcher.sendTask(judge.getSubmitId(), judge.getPid(), toJudge.getToken(),
|
||||
toJudge.getRemoteJudge(), judge.getCid() != 0, toJudge.getTryAgainNum() + 1);
|
||||
}
|
||||
|
||||
return CommonResult.errorResponse("判题服务器繁忙或出错,提交进入重判队列,请等待管理员处理!", CommonResult.STATUS_ERROR);
|
||||
}
|
||||
|
||||
}
|
|
@ -23,7 +23,7 @@ public class RestTemplateConfig {
|
|||
@Bean
|
||||
public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
|
||||
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
|
||||
factory.setReadTimeout(10000);//单位为ms
|
||||
factory.setReadTimeout(30000);//单位为ms
|
||||
factory.setConnectTimeout(10000);//单位为ms
|
||||
return factory;
|
||||
}
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
package top.hcode.hoj.config;
|
||||
|
||||
import com.netflix.loadbalancer.IRule;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import top.hcode.hoj.judge.self.JudgeChooseRule;
|
||||
|
||||
/**
|
||||
* @Author: Himit_ZH
|
||||
* @Date: 2021/2/4 23:10
|
||||
* @Description:
|
||||
*/
|
||||
@Configuration
|
||||
public class RibbonConfig {
|
||||
|
||||
@Bean
|
||||
public IRule ribbonRule() {
|
||||
// 随机的负载均衡策略对象
|
||||
return new JudgeChooseRule();
|
||||
}
|
||||
|
||||
}
|
|
@ -11,6 +11,7 @@ import top.hcode.hoj.pojo.vo.ConfigVo;
|
|||
import top.hcode.hoj.utils.Constants;
|
||||
import top.hcode.hoj.utils.RedisUtils;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -58,4 +59,5 @@ public class StartupRunner implements CommandLineRunner {
|
|||
log.error("CF判题账号注入Redis的List异常------------>{}", "请检查配置文件,然后重新启动!");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -95,6 +95,7 @@ public class AdminJudgeController {
|
|||
|
||||
// 设置默认值
|
||||
judge.setStatus(Constants.Judge.STATUS_PENDING.getStatus()); // 开始进入判题队列
|
||||
judge.setVersion(judge.getVersion() + 1);
|
||||
judge.setJudger(null).setTime(null).setMemory(null).setErrorMessage(null);
|
||||
boolean result = judgeService.updateById(judge);
|
||||
if (result) {
|
||||
|
@ -130,6 +131,7 @@ public class AdminJudgeController {
|
|||
// 全部设置默认值
|
||||
for (Judge judge : rejudgeList) {
|
||||
judge.setStatus(Constants.Judge.STATUS_PENDING.getStatus()); // 开始进入判题队列
|
||||
judge.setVersion(judge.getVersion() + 1);
|
||||
judge.setJudger(null).setTime(null).setMemory(null).setErrorMessage(null);
|
||||
submitIdList.add(judge.getSubmitId());
|
||||
}
|
||||
|
|
|
@ -14,10 +14,10 @@ import org.springframework.util.StringUtils;
|
|||
import org.springframework.web.bind.annotation.*;
|
||||
import top.hcode.hoj.common.result.CommonResult;
|
||||
import top.hcode.hoj.crawler.problem.ProblemStrategy;
|
||||
import top.hcode.hoj.judge.JudgeServerUtils;
|
||||
import top.hcode.hoj.pojo.dto.ProblemDto;
|
||||
import top.hcode.hoj.pojo.entity.*;
|
||||
import top.hcode.hoj.pojo.vo.UserRolesVo;
|
||||
import top.hcode.hoj.service.ToJudgeService;
|
||||
import top.hcode.hoj.service.impl.*;
|
||||
|
||||
|
||||
|
@ -43,7 +43,7 @@ public class AdminProblemController {
|
|||
private ProblemCaseServiceImpl problemCaseService;
|
||||
|
||||
@Autowired
|
||||
private ToJudgeService toJudgeService;
|
||||
private JudgeServerUtils judgeServerUtils;
|
||||
|
||||
@Value("${hoj.judge.token}")
|
||||
private String judgeToken;
|
||||
|
@ -179,7 +179,7 @@ public class AdminProblemController {
|
|||
}
|
||||
|
||||
compileSpj.setToken(judgeToken);
|
||||
return toJudgeService.compileSpj(compileSpj);
|
||||
return judgeServerUtils.dispatcher("compile", "/compile-spj", compileSpj);
|
||||
}
|
||||
|
||||
@GetMapping("/import-remote-oj-problem")
|
||||
|
|
|
@ -276,6 +276,7 @@ public class JudgeController {
|
|||
Problem problem = problemService.getById(judge.getPid());
|
||||
// 重新进入等待队列
|
||||
judge.setStatus(Constants.Judge.STATUS_PENDING.getStatus());
|
||||
judge.setVersion(judge.getVersion() + 1);
|
||||
judge.setErrorMessage(null);
|
||||
judgeService.updateById(judge);
|
||||
// 将提交加入任务队列
|
||||
|
@ -332,7 +333,7 @@ public class JudgeController {
|
|||
if (judge.getStatus().intValue() != Constants.Judge.STATUS_COMPILE_ERROR.getStatus() &&
|
||||
judge.getStatus().intValue() != Constants.Judge.STATUS_SYSTEM_ERROR.getStatus() &&
|
||||
judge.getStatus().intValue() != Constants.Judge.STATUS_SUBMITTED_FAILED.getStatus()) {
|
||||
judge.setErrorMessage("");
|
||||
judge.setErrorMessage("The error message does not support viewing.");
|
||||
}
|
||||
result.put("submission", judge);
|
||||
result.put("codeShare", problem.getCodeShare());
|
||||
|
@ -451,6 +452,10 @@ public class JudgeController {
|
|||
|
||||
Judge judge = judgeService.getById(submitId);
|
||||
|
||||
if (judge == null) {
|
||||
return CommonResult.errorResponse("此提交数据不存在!");
|
||||
}
|
||||
|
||||
Problem problem = problemService.getById(judge.getPid());
|
||||
|
||||
// 如果该题不支持开放测试点结果查看
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
package top.hcode.hoj.dao;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import top.hcode.hoj.pojo.entity.JudgeServer;
|
||||
|
||||
@Mapper
|
||||
@Repository
|
||||
public interface JudgeServerMapper extends BaseMapper<JudgeServer> {
|
||||
}
|
|
@ -0,0 +1,233 @@
|
|||
package top.hcode.hoj.judge;
|
||||
|
||||
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
|
||||
import com.alibaba.cloud.nacos.ribbon.NacosServer;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Isolation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import top.hcode.hoj.common.result.CommonResult;
|
||||
import top.hcode.hoj.pojo.entity.CompileSpj;
|
||||
import top.hcode.hoj.pojo.entity.Judge;
|
||||
import top.hcode.hoj.pojo.entity.JudgeServer;
|
||||
import top.hcode.hoj.pojo.entity.ToJudge;
|
||||
import top.hcode.hoj.service.impl.JudgeServerServiceImpl;
|
||||
import top.hcode.hoj.service.impl.JudgeServiceImpl;
|
||||
import top.hcode.hoj.utils.Constants;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @Author: Himit_ZH
|
||||
* @Date: 2021/4/15 17:29
|
||||
* @Description:
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class JudgeServerUtils {
|
||||
|
||||
@Autowired
|
||||
private NacosDiscoveryProperties discoveryProperties;
|
||||
|
||||
@Value("${service-url.name}")
|
||||
private String JudgeServiceName;
|
||||
|
||||
@Autowired
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
@Autowired
|
||||
private JudgeServerServiceImpl judgeServerService;
|
||||
|
||||
@Autowired
|
||||
private JudgeServiceImpl judgeService;
|
||||
|
||||
|
||||
public CommonResult dispatcher(String type, String path, Object data) {
|
||||
switch (type) {
|
||||
case "judge":
|
||||
ToJudge judgeData = (ToJudge) data;
|
||||
toJudge(path, (ToJudge) data, judgeData.getJudge().getSubmitId(), judgeData.getRemoteJudge() != null);
|
||||
break;
|
||||
case "compile":
|
||||
CompileSpj compileSpj = (CompileSpj) data;
|
||||
return toCompile(path, compileSpj);
|
||||
default:
|
||||
throw new NullPointerException("判题机不支持此调用类型");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Transactional(isolation = Isolation.READ_COMMITTED)
|
||||
public void toJudge(String path, ToJudge data, Long submitId, Boolean isRemote) {
|
||||
// 尝试30s
|
||||
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
Runnable getResultTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
count.getAndIncrement();
|
||||
JudgeServer judgeServer = chooseServer(isRemote);
|
||||
if (count.get() == 30) { // 30次失败则判为提交失败
|
||||
checkResult(null, submitId);
|
||||
scheduler.shutdown();
|
||||
}
|
||||
if (judgeServer != null) { // 获取到判题机资源
|
||||
CommonResult result = null;
|
||||
try {
|
||||
result = restTemplate.postForObject("http://" + judgeServer.getUrl() + path, data, CommonResult.class);
|
||||
} catch (Exception e) {
|
||||
log.error("调用判题服务器[" + judgeServer.getUrl() + "]发送异常-------------->{}", e.getMessage());
|
||||
} finally {
|
||||
checkResult(result, submitId);
|
||||
// 无论成功与否,都要将对应的当前判题机当前判题数减1
|
||||
reduceCurrentTaskNum(judgeServer.getId());
|
||||
scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
scheduler.scheduleAtFixedRate(getResultTask, 0, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Transactional(isolation = Isolation.READ_COMMITTED)
|
||||
public CommonResult toCompile(String path, CompileSpj data) {
|
||||
CommonResult result = CommonResult.errorResponse("没有可用的判题服务器,请重新尝试!");
|
||||
JudgeServer judgeServer = chooseServer(false);
|
||||
if (judgeServer != null) {
|
||||
try {
|
||||
result = restTemplate.postForObject("http://" + judgeServer.getUrl() + path, data, CommonResult.class);
|
||||
} catch (Exception e) {
|
||||
log.error("调用判题服务器[" + judgeServer.getUrl() + "]发送异常-------------->{}", e.getMessage());
|
||||
} finally {
|
||||
// 无论成功与否,都要将对应的当前判题机当前判题数减1
|
||||
reduceCurrentTaskNum(judgeServer.getId());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param
|
||||
* @MethodName chooseServer
|
||||
* @Description 选择可以用调用判题的判题服务器
|
||||
* @Return
|
||||
* @Since 2021/4/15
|
||||
*/
|
||||
public JudgeServer chooseServer(Boolean isRemote) {
|
||||
// 获取该微服务的所有健康实例
|
||||
List<Instance> instances = getInstances(JudgeServiceName);
|
||||
if (instances.size() <= 0) {
|
||||
return null;
|
||||
}
|
||||
List<String> keyList = new ArrayList<>();
|
||||
// 获取当前健康实例取出ip和port拼接
|
||||
for (Instance instance : instances) {
|
||||
keyList.add(instance.getIp() + ":" + instance.getPort());
|
||||
}
|
||||
// 过滤出小于或等于规定最大并发判题任务数的服务实例且健康的判题机
|
||||
QueryWrapper<JudgeServer> judgeServerQueryWrapper = new QueryWrapper<>();
|
||||
judgeServerQueryWrapper
|
||||
.in("url", keyList)
|
||||
.eq("is_remote", isRemote)
|
||||
.orderByAsc("task_number");
|
||||
List<JudgeServer> judgeServerList = judgeServerService.list(judgeServerQueryWrapper);
|
||||
// 使用乐观锁获取可用判题机
|
||||
for (JudgeServer judgeServer : judgeServerList) {
|
||||
if (judgeServer.getTaskNumber() < judgeServer.getMaxTaskNumber()) {
|
||||
judgeServer.setTaskNumber(judgeServer.getTaskNumber() + 1);
|
||||
boolean isOk = judgeServerService.updateById(judgeServer);
|
||||
if (isOk) {
|
||||
return judgeServer;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param serviceId
|
||||
* @MethodName getInstances
|
||||
* @Description 根据服务id获取对应的健康实例列表
|
||||
* @Return
|
||||
* @Since 2021/4/15
|
||||
*/
|
||||
private List<Instance> getInstances(String serviceId) {
|
||||
// 获取服务发现的相关API
|
||||
NamingService namingService = discoveryProperties.namingServiceInstance();
|
||||
try {
|
||||
// 获取该微服务的所有健康实例
|
||||
return namingService.selectInstances(serviceId, true);
|
||||
} catch (NacosException e) {
|
||||
log.error("获取微服务健康实例发生异常--------->{}", e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkResult(CommonResult result, Long submitId) {
|
||||
|
||||
Judge judge = new Judge();
|
||||
if (result == null) { // 调用失败
|
||||
judge.setSubmitId(submitId);
|
||||
judge.setStatus(Constants.Judge.STATUS_SUBMITTED_FAILED.getStatus());
|
||||
judge.setErrorMessage("Failed to connect the judgeServer. Please resubmit this submission again!");
|
||||
judgeService.updateById(judge);
|
||||
} else {
|
||||
if (result.getStatus().intValue() != CommonResult.STATUS_SUCCESS) { // 如果是结果码不是200 说明调用有错误
|
||||
// 判为系统错误
|
||||
judge.setStatus(Constants.Judge.STATUS_SYSTEM_ERROR.getStatus())
|
||||
.setErrorMessage(result.getMsg());
|
||||
judgeService.updateById(judge);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional(isolation = Isolation.READ_COMMITTED)
|
||||
public void reduceCurrentTaskNum(Integer id) {
|
||||
UpdateWrapper<JudgeServer> judgeServerUpdateWrapper = new UpdateWrapper<>();
|
||||
judgeServerUpdateWrapper.setSql("task_number = task_number-1").eq("id", id);
|
||||
boolean isOk = judgeServerService.update(judgeServerUpdateWrapper);
|
||||
if (!isOk) { // 重试八次
|
||||
tryAgainUpdate(judgeServerUpdateWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional(isolation = Isolation.READ_COMMITTED)
|
||||
public void tryAgainUpdate(UpdateWrapper<JudgeServer> updateWrapper) {
|
||||
boolean retryable;
|
||||
int attemptNumber = 0;
|
||||
do {
|
||||
boolean success = judgeServerService.update(updateWrapper);
|
||||
if (success) {
|
||||
return;
|
||||
} else {
|
||||
attemptNumber++;
|
||||
retryable = attemptNumber < 8;
|
||||
if (attemptNumber == 8) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(300);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
} while (retryable);
|
||||
}
|
||||
}
|
|
@ -6,9 +6,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
import top.hcode.hoj.judge.JudgeServerUtils;
|
||||
import top.hcode.hoj.pojo.entity.Judge;
|
||||
import top.hcode.hoj.pojo.entity.ToJudge;
|
||||
import top.hcode.hoj.service.ToJudgeService;
|
||||
import top.hcode.hoj.service.impl.JudgeServiceImpl;
|
||||
import top.hcode.hoj.utils.Constants;
|
||||
import top.hcode.hoj.utils.RedisUtils;
|
||||
|
@ -19,8 +19,6 @@ import java.util.concurrent.TimeUnit;
|
|||
@Async
|
||||
public class RemoteJudgeReceiver {
|
||||
|
||||
@Autowired
|
||||
private ToJudgeService toJudgeService;
|
||||
|
||||
@Autowired
|
||||
private RemoteJudgeDispatcher remoteJudgeDispatcher;
|
||||
|
@ -28,6 +26,9 @@ public class RemoteJudgeReceiver {
|
|||
@Autowired
|
||||
private JudgeServiceImpl judgeService;
|
||||
|
||||
@Autowired
|
||||
private JudgeServerUtils judgeServerUtils;
|
||||
|
||||
@Autowired
|
||||
private RedisUtils redisUtils;
|
||||
|
||||
|
@ -54,7 +55,6 @@ public class RemoteJudgeReceiver {
|
|||
String remoteJudge = task.getStr("remoteJudge");
|
||||
Boolean isContest = task.getBool("isContest");
|
||||
Integer tryAgainNum = task.getInt("tryAgainNum");
|
||||
|
||||
// 如果对应远程判题oj的账号列表还有账号
|
||||
String remoteJudgeAccountListName = Constants.Judge.getListNameByOJName(remoteJudge.split("-")[0]);
|
||||
|
||||
|
@ -75,7 +75,7 @@ public class RemoteJudgeReceiver {
|
|||
password = accountJson.getStr("password");
|
||||
Judge judge = judgeService.getById(submitId);
|
||||
// 调用判题服务
|
||||
toJudgeService.remoteJudge(new ToJudge()
|
||||
judgeServerUtils.dispatcher("judge", "/remote-judge", new ToJudge()
|
||||
.setJudge(judge)
|
||||
.setToken(token)
|
||||
.setRemoteJudge(remoteJudge)
|
||||
|
@ -85,10 +85,10 @@ public class RemoteJudgeReceiver {
|
|||
|
||||
// 如果队列中还有任务,则继续处理
|
||||
processWaitingTask();
|
||||
|
||||
}
|
||||
} else {
|
||||
|
||||
if (tryAgainNum >= 40) {
|
||||
if (tryAgainNum >= 30) {
|
||||
// 获取调用多次失败可能为系统忙碌,判为提交失败
|
||||
Judge judge = new Judge();
|
||||
judge.setSubmitId(submitId);
|
||||
|
|
|
@ -6,6 +6,7 @@ import com.alibaba.cloud.nacos.ribbon.NacosServer;
|
|||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.netflix.client.config.IClientConfig;
|
||||
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
|
||||
import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
|
||||
|
@ -13,9 +14,15 @@ import com.netflix.loadbalancer.Server;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import top.hcode.hoj.pojo.entity.JudgeServer;
|
||||
import top.hcode.hoj.service.impl.JudgeServerServiceImpl;
|
||||
import top.hcode.hoj.utils.RedisUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -25,12 +32,22 @@ import java.util.stream.Collectors;
|
|||
* @Description: 任务调度的自定义负载均衡策略
|
||||
*/
|
||||
@Slf4j
|
||||
|
||||
public class JudgeChooseRule extends AbstractLoadBalancerRule {
|
||||
|
||||
@Autowired
|
||||
private NacosDiscoveryProperties discoveryProperties;
|
||||
|
||||
|
||||
private JudgeServerServiceImpl judgeServerService ;
|
||||
|
||||
|
||||
@Autowired
|
||||
public void setJudgeServerService (JudgeServerServiceImpl judgeServerService){
|
||||
this.judgeServerService = judgeServerService;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void initWithNiwsConfig(IClientConfig iClientConfig) {
|
||||
|
||||
|
@ -45,24 +62,39 @@ public class JudgeChooseRule extends AbstractLoadBalancerRule {
|
|||
String serviceId = loadBalancer.getName();
|
||||
// 获取该微服务的所有健康实例
|
||||
List<Instance> instances = getInstances(serviceId);
|
||||
// 进行匹配筛选的实例列表
|
||||
List<Instance> metadataMatchInstances;
|
||||
// 过滤出小于或等于规定最大并发判题任务数的服务实例
|
||||
metadataMatchInstances = instances.stream()
|
||||
.filter(instance ->
|
||||
Integer.parseInt(instance.getMetadata().getOrDefault("currentTaskNum", "0"))<=
|
||||
Integer.parseInt(instance.getMetadata().getOrDefault("maxTaskNum","4"))
|
||||
).collect(Collectors.toList());
|
||||
// 如果为空闲判题服务器的数量为空,则该判题请求重新进入等待队列
|
||||
if (CollectionUtils.isEmpty(metadataMatchInstances)) {
|
||||
if (instances.size() <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 基于随机权重的负载均衡算法,选取其中一个实例
|
||||
Instance instance = ExtendBalancer.getHostByRandomWeight2(metadataMatchInstances);
|
||||
return new NacosServer(instance);
|
||||
List<String> keyList = new ArrayList<>();
|
||||
// 获取当前健康实例取出ip和port拼接
|
||||
for (Instance instance : instances) {
|
||||
keyList.add(instance.getIp() + ":" + instance.getPort());
|
||||
}
|
||||
|
||||
// 过滤出小于或等于规定最大并发判题任务数的服务实例且健康的判题机
|
||||
QueryWrapper<JudgeServer> judgeServerQueryWrapper = new QueryWrapper<>();
|
||||
judgeServerQueryWrapper
|
||||
.in("url", keyList)
|
||||
.orderByAsc("task_num");
|
||||
List<JudgeServer> judgeServerList = judgeServerService.list(judgeServerQueryWrapper);
|
||||
System.out.println(judgeServerList);
|
||||
// 使用乐观锁获取可用判题机
|
||||
for (JudgeServer judgeServer : judgeServerList) {
|
||||
if (judgeServer.getTaskNumber() <= judgeServer.getMaxTaskNumber()) {
|
||||
judgeServer.setTaskNumber(judgeServer.getTaskNumber() + 1);
|
||||
boolean isOk = judgeServerService.updateById(judgeServer);
|
||||
if (isOk) {
|
||||
int instanceIndex = keyList.indexOf(judgeServer.getIp() + ":" + judgeServer.getPort());
|
||||
if (instanceIndex != -1) {
|
||||
return new NacosServer(instances.get(instanceIndex));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
private List<Instance> getInstances(String serviceId) {
|
||||
|
|
|
@ -2,13 +2,13 @@ package top.hcode.hoj.judge.self;
|
|||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
import top.hcode.hoj.common.result.CommonResult;
|
||||
import top.hcode.hoj.judge.JudgeServerUtils;
|
||||
import top.hcode.hoj.pojo.entity.Judge;
|
||||
import top.hcode.hoj.pojo.entity.ToJudge;
|
||||
import top.hcode.hoj.service.ToJudgeService;
|
||||
import top.hcode.hoj.service.impl.JudgeServiceImpl;
|
||||
import top.hcode.hoj.utils.Constants;
|
||||
import top.hcode.hoj.utils.RedisUtils;
|
||||
|
@ -22,10 +22,11 @@ import top.hcode.hoj.utils.RedisUtils;
|
|||
*/
|
||||
@Component
|
||||
@Async
|
||||
@Slf4j
|
||||
public class JudgeReceiver {
|
||||
|
||||
@Autowired
|
||||
private ToJudgeService toJudgeService;
|
||||
private JudgeServerUtils judgeServerUtils;
|
||||
|
||||
@Autowired
|
||||
private RedisUtils redisUtils;
|
||||
|
@ -33,8 +34,6 @@ public class JudgeReceiver {
|
|||
@Autowired
|
||||
private JudgeServiceImpl judgeService;
|
||||
|
||||
@Autowired
|
||||
private JudgeDispatcher judgeDispatcher;
|
||||
|
||||
public void processWaitingTask() {
|
||||
// 如果队列中还有任务,则继续处理
|
||||
|
@ -54,16 +53,15 @@ public class JudgeReceiver {
|
|||
String token = task.getStr("token");
|
||||
Integer tryAgainNum = task.getInt("tryAgainNum");
|
||||
Judge judge = judgeService.getById(submitId);
|
||||
|
||||
// 调用判题服务
|
||||
toJudgeService.submitProblemJudge(new ToJudge()
|
||||
judgeServerUtils.dispatcher("judge", "/judge", new ToJudge()
|
||||
.setJudge(judge)
|
||||
.setToken(token)
|
||||
.setRemoteJudge(null)
|
||||
.setTryAgainNum(tryAgainNum));
|
||||
|
||||
// 接着处理任务
|
||||
processWaitingTask();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package top.hcode.hoj.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
|
||||
import top.hcode.hoj.pojo.entity.JudgeServer;
|
||||
|
||||
public interface JudgeServerService extends IService<JudgeServer> {
|
||||
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package top.hcode.hoj.service;
|
||||
|
||||
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import top.hcode.hoj.common.exception.CloudHandler;
|
||||
import top.hcode.hoj.common.result.CommonResult;
|
||||
import top.hcode.hoj.config.RibbonConfig;
|
||||
import top.hcode.hoj.pojo.entity.CompileSpj;
|
||||
import top.hcode.hoj.pojo.entity.ToJudge;
|
||||
|
||||
|
||||
//需要的判题微服务名
|
||||
@FeignClient(value = "hoj-judge-server", fallback = CloudHandler.class, configuration = RibbonConfig.class)
|
||||
@Component
|
||||
public interface ToJudgeService {
|
||||
|
||||
@PostMapping(value = "/judge")
|
||||
public CommonResult submitProblemJudge(ToJudge toJudge);
|
||||
|
||||
@PostMapping(value = "/compile-spj")
|
||||
public CommonResult compileSpj(CompileSpj compileSpj);
|
||||
|
||||
@PostMapping(value = "/remote-judge")
|
||||
public CommonResult remoteJudge(ToJudge toJudge);
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package top.hcode.hoj.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import top.hcode.hoj.dao.JudgeServerMapper;
|
||||
|
||||
import top.hcode.hoj.pojo.entity.JudgeServer;
|
||||
import top.hcode.hoj.service.JudgeServerService;
|
||||
|
||||
/**
|
||||
* @Author: Himit_ZH
|
||||
* @Date: 2021/4/15 11:27
|
||||
* @Description:
|
||||
*/
|
||||
@Service
|
||||
public class JudgeServerServiceImpl extends ServiceImpl<JudgeServerMapper, JudgeServer> implements JudgeServerService {
|
||||
|
||||
|
||||
}
|
|
@ -7,13 +7,10 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.StringUtils;
|
||||
import top.hcode.hoj.common.result.CommonResult;
|
||||
import top.hcode.hoj.crawler.problem.CFProblemStrategy;
|
||||
import top.hcode.hoj.crawler.problem.HDUProblemStrategy;
|
||||
import top.hcode.hoj.crawler.problem.ProblemContext;
|
||||
import top.hcode.hoj.crawler.problem.ProblemStrategy;
|
||||
import top.hcode.hoj.dao.ProblemCaseMapper;
|
||||
import top.hcode.hoj.pojo.dto.ProblemDto;
|
||||
import top.hcode.hoj.pojo.entity.*;
|
||||
import top.hcode.hoj.pojo.vo.ProblemVo;
|
||||
|
@ -21,7 +18,6 @@ import top.hcode.hoj.dao.ProblemMapper;
|
|||
import top.hcode.hoj.service.ProblemService;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.springframework.stereotype.Service;
|
||||
import top.hcode.hoj.service.ToJudgeService;
|
||||
import top.hcode.hoj.utils.Constants;
|
||||
|
||||
import java.io.File;
|
||||
|
|
|
@ -22,7 +22,6 @@ public class Constants {
|
|||
public static String ojEmailFrom;
|
||||
|
||||
|
||||
|
||||
@Value("${hoj-backstage.addr}")
|
||||
public void setOjAddr(String ojAddr) {
|
||||
Constants.ojAddr = ojAddr;
|
||||
|
@ -72,8 +71,8 @@ public class Constants {
|
|||
STATUS_JUDGE_WAITING(-100, "Waiting Queue", null),
|
||||
STATUS_REMOTE_JUDGE_WAITING_HANDLE(-200, "Remote Waiting Handle Queue", null),
|
||||
STATUS_HDU_REMOTE_JUDGE_ACCOUNT(-1000, "Hdu Remote Judge Account", null),
|
||||
STATUS_CF_REMOTE_JUDGE_ACCOUNT(-1001, "Codeforces Remote Judge Account", null);
|
||||
|
||||
STATUS_CF_REMOTE_JUDGE_ACCOUNT(-1001, "Codeforces Remote Judge Account", null),
|
||||
JUDGE_SERVER_SUBMIT_PREFIX(-1002,"Judge SubmitId-ServerId:",null);
|
||||
private Judge(Integer status, String name, String columnName) {
|
||||
this.status = status;
|
||||
this.name = name;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
hoj-backstage:
|
||||
port: 6688
|
||||
nacos-url: 172.18.0.2:8848
|
||||
nacos-url: http://172.18.0.2:8848
|
||||
addr: http://oj.hcode.top
|
||||
name: Hcode Online Judge
|
||||
short-name: HOJ
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
hoj-backstage:
|
||||
port: 6688
|
||||
nacos-url: 172.18.0.2:8848
|
||||
nacos-url: http://172.18.0.2:8848
|
||||
addr: http://oj.hcode.top
|
||||
name: Hcode Online Judge
|
||||
short-name: HOJ
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue