集成 spring-cloud-starter-bus-rocketmq 组件
This commit is contained in:
parent
7b36eca609
commit
6dd514b84a
|
@ -12,7 +12,11 @@
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>${project.artifactId}</name>
|
<name>${project.artifactId}</name>
|
||||||
<description>消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费</description>
|
<description>
|
||||||
|
消息队列:
|
||||||
|
1. 基于 Spring Cloud Stream 实现异步消息
|
||||||
|
2. 基于 Spring Cloud Bus 实现事件总线
|
||||||
|
</description>
|
||||||
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
|
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -28,6 +32,12 @@
|
||||||
<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
|
<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
|
||||||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.cloud</groupId>
|
||||||
|
<!-- 引入基于 RocketMQ 的 Spring Cloud Bus 的实现的依赖,并实现对其的自动配置 -->
|
||||||
|
<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -1,33 +1,19 @@
|
||||||
package cn.iocoder.yudao.framework.mq.config;
|
package cn.iocoder.yudao.framework.mq.config;
|
||||||
|
|
||||||
import cn.hutool.core.map.MapUtil;
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import cn.hutool.system.SystemUtil;
|
|
||||||
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
||||||
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
|
|
||||||
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
|
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
|
||||||
import org.springframework.data.redis.connection.stream.Consumer;
|
|
||||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
|
||||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
|
||||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
|
||||||
import org.springframework.data.redis.core.RedisCallback;
|
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
import org.springframework.data.redis.listener.ChannelTopic;
|
import org.springframework.messaging.converter.*;
|
||||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
|
||||||
import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
|
|
||||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消息队列配置类
|
* 消息队列配置类
|
||||||
|
@ -48,96 +34,19 @@ public class YudaoMQAutoConfiguration {
|
||||||
return redisMQTemplate;
|
return redisMQTemplate;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========== 消费者相关 ==========
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建 Redis Pub/Sub 广播消费的容器
|
* 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
|
||||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
|
||||||
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
public CompositeMessageConverter rocketMQMessageConverter() {
|
||||||
// 创建 RedisMessageListenerContainer 对象
|
List<MessageConverter> messageConverters = new ArrayList<>();
|
||||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
|
||||||
// 设置 RedisConnection 工厂。
|
byteArrayMessageConverter.setContentTypeResolver(null);
|
||||||
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
|
messageConverters.add(byteArrayMessageConverter);
|
||||||
// 添加监听器
|
messageConverters.add(new StringMessageConverter());
|
||||||
listeners.forEach(listener -> {
|
messageConverters.add(new MappingJackson2MessageConverter());
|
||||||
listener.setRedisMQTemplate(redisMQTemplate);
|
return new CompositeMessageConverter(messageConverters);
|
||||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
|
||||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
|
||||||
listener.getChannel(), listener.getClass().getName());
|
|
||||||
});
|
|
||||||
return container;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建 Redis Stream 集群消费的容器
|
|
||||||
*
|
|
||||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
|
||||||
*/
|
|
||||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
|
||||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
|
||||||
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
|
||||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
|
||||||
checkRedisVersion(redisTemplate);
|
|
||||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
|
||||||
// 创建 options 配置
|
|
||||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
|
||||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
|
||||||
.batchSize(10) // 一次性最多拉取多少条消息
|
|
||||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
|
||||||
.build();
|
|
||||||
// 创建 container 对象
|
|
||||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
|
||||||
// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
|
|
||||||
DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
|
||||||
|
|
||||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
|
||||||
String consumerName = buildConsumerName();
|
|
||||||
listeners.parallelStream().forEach(listener -> {
|
|
||||||
// 创建 listener 对应的消费者分组
|
|
||||||
try {
|
|
||||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
|
||||||
} catch (Exception ignore) {}
|
|
||||||
// 设置 listener 对应的 redisTemplate
|
|
||||||
listener.setRedisMQTemplate(redisMQTemplate);
|
|
||||||
// 创建 Consumer 对象
|
|
||||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
|
||||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
|
||||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
|
||||||
// 设置 Consumer 监听
|
|
||||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
|
||||||
.builder(streamOffset).consumer(consumer)
|
|
||||||
.autoAcknowledge(false) // 不自动 ack
|
|
||||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
|
||||||
container.register(builder.build(), listener);
|
|
||||||
});
|
|
||||||
return container;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
|
||||||
* 参考自 RocketMQ clientId 的实现
|
|
||||||
*
|
|
||||||
* @return 消费者名字
|
|
||||||
*/
|
|
||||||
private static String buildConsumerName() {
|
|
||||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 校验 Redis 版本号,是否满足最低的版本号要求!
|
|
||||||
*/
|
|
||||||
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
|
|
||||||
// 获得 Redis 版本
|
|
||||||
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
|
|
||||||
String version = MapUtil.getStr(info, "redis_version");
|
|
||||||
// 校验最低版本必须大于等于 5.0.0
|
|
||||||
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
|
|
||||||
if (majorVersion < 5) {
|
|
||||||
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
|
|
||||||
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package cn.iocoder.yudao.framework.mq.core.bus;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.cloud.bus.ServiceMatcher;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 基于 Spring Cloud Bus 实现的 Producer 抽象类
|
||||||
|
*
|
||||||
|
* @author 芋道源码
|
||||||
|
*/
|
||||||
|
public abstract class AbstractBusProducer {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
protected ApplicationEventPublisher applicationEventPublisher;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
protected ServiceMatcher serviceMatcher;
|
||||||
|
|
||||||
|
@Value("{spring.application.name}")
|
||||||
|
protected String applicationName;
|
||||||
|
|
||||||
|
protected void publishEvent(RemoteApplicationEvent event) {
|
||||||
|
applicationEventPublisher.publishEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return 只广播给自己服务的实例
|
||||||
|
*/
|
||||||
|
protected String selfDestinationService() {
|
||||||
|
return applicationName + ":**";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getBusId() {
|
||||||
|
return serviceMatcher.getBusId();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,62 +0,0 @@
|
||||||
package org.springframework.data.redis.stream;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.ReflectUtil;
|
|
||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
|
||||||
import org.springframework.data.redis.connection.stream.ByteRecord;
|
|
||||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
|
||||||
import org.springframework.data.redis.connection.stream.Record;
|
|
||||||
import org.springframework.util.Assert;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。
|
|
||||||
* 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006
|
|
||||||
* 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽!
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public class DefaultStreamMessageListenerContainerX<K, V extends Record<K, ?>> extends DefaultStreamMessageListenerContainer<K, V> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现
|
|
||||||
*/
|
|
||||||
public static <K, V extends Record<K, ?>> StreamMessageListenerContainer<K, V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) {
|
|
||||||
Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!");
|
|
||||||
Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!");
|
|
||||||
return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions<K, V> containerOptions) {
|
|
||||||
super(connectionFactory, containerOptions);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
|
||||||
return this.doRegisterX(getReadTaskX(streamRequest, listener));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private StreamPollTask<K, V> getReadTaskX(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
|
||||||
StreamPollTask<K, V> task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener);
|
|
||||||
// 修改 readFunction 方法
|
|
||||||
Function<ReadOffset, List<ByteRecord>> readFunction = (Function<ReadOffset, List<ByteRecord>>) ReflectUtil.getFieldValue(task, "readFunction");
|
|
||||||
ReflectUtil.setFieldValue(task, "readFunction", (Function<ReadOffset, List<ByteRecord>>) readOffset -> {
|
|
||||||
List<ByteRecord> records = readFunction.apply(readOffset);
|
|
||||||
//【重点】保证 records 不是空,避免 NPE 的问题!!!
|
|
||||||
return records != null ? records : Collections.emptyList();
|
|
||||||
});
|
|
||||||
return task;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Subscription doRegisterX(Task task) {
|
|
||||||
return ReflectUtil.invoke(this, "doRegister", task);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -104,8 +104,8 @@ public class YudaoWebAutoConfiguration implements WebMvcConfigurer {
|
||||||
* 创建 XssFilter Bean,解决 Xss 安全问题
|
* 创建 XssFilter Bean,解决 Xss 安全问题
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public FilterRegistrationBean<XssFilter> xssFilter(XssProperties properties, PathMatcher pathMatcher) {
|
public FilterRegistrationBean<XssFilter> xssFilter(XssProperties properties, PathMatcher mvcPathMatcher) {
|
||||||
return createFilterBean(new XssFilter(properties, pathMatcher), WebFilterOrderEnum.XSS_FILTER);
|
return createFilterBean(new XssFilter(properties, mvcPathMatcher), WebFilterOrderEnum.XSS_FILTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.consumer.auth;
|
package cn.iocoder.yudao.module.system.mq.consumer.auth;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.auth.OAuth2ClientRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.auth.OAuth2ClientRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.oauth2.OAuth2ClientService;
|
import cn.iocoder.yudao.module.system.service.oauth2.OAuth2ClientService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -15,14 +15,14 @@ import javax.annotation.Resource;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class OAuth2ClientRefreshConsumer extends AbstractChannelMessageListener<OAuth2ClientRefreshMessage> {
|
public class OAuth2ClientRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private OAuth2ClientService oauth2ClientService;
|
private OAuth2ClientService oauth2ClientService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void onMessage(OAuth2ClientRefreshMessage message) {
|
public void execute(OAuth2ClientRefreshMessage message) {
|
||||||
log.info("[onMessage][收到 OAuth2Client 刷新消息]");
|
log.info("[execute][收到 OAuth2Client 刷新消息]");
|
||||||
oauth2ClientService.initLocalCache();
|
oauth2ClientService.initLocalCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.consumer.dept;
|
package cn.iocoder.yudao.module.system.mq.consumer.dept;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.dept.DeptRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.dept.DeptRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.dept.DeptService;
|
import cn.iocoder.yudao.module.system.service.dept.DeptService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -15,14 +15,14 @@ import javax.annotation.Resource;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DeptRefreshConsumer extends AbstractChannelMessageListener<DeptRefreshMessage> {
|
public class DeptRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private DeptService deptService;
|
private DeptService deptService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void onMessage(DeptRefreshMessage message) {
|
public void execute(DeptRefreshMessage message) {
|
||||||
log.info("[onMessage][收到 Dept 刷新消息]");
|
log.info("[execute][收到 Dept 刷新消息]");
|
||||||
deptService.initLocalCache();
|
deptService.initLocalCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.permission.MenuService;
|
import cn.iocoder.yudao.module.system.service.permission.MenuService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 针对 {@link MenuRefreshMessage} 的消费者
|
* 针对 {@link MenuRefreshMessage} 的消费者
|
||||||
|
@ -15,14 +15,14 @@ import java.util.function.Consumer;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MenuRefreshConsumer implements Consumer<MenuRefreshMessage> {
|
public class MenuRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private MenuService menuService;
|
private MenuService menuService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void accept(MenuRefreshMessage menuRefreshMessage) {
|
public void execute(MenuRefreshMessage menuRefreshMessage) {
|
||||||
log.info("[accept][收到 Menu 刷新消息]");
|
log.info("[execute][收到 Menu 刷新消息]");
|
||||||
menuService.initLocalCache();
|
menuService.initLocalCache();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.consumer.permission;
|
package cn.iocoder.yudao.module.system.mq.consumer.permission;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.permission.PermissionService;
|
import cn.iocoder.yudao.module.system.service.permission.PermissionService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 针对 {@link RoleMenuRefreshMessage} 的消费者
|
* 针对 {@link RoleMenuRefreshMessage} 的消费者
|
||||||
|
@ -16,14 +15,14 @@ import java.util.function.Consumer;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RoleMenuRefreshConsumer implements Consumer<RoleMenuRefreshMessage> {
|
public class RoleMenuRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private PermissionService permissionService;
|
private PermissionService permissionService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void accept(RoleMenuRefreshMessage roleMenuRefreshMessage) {
|
public void execute(RoleMenuRefreshMessage roleMenuRefreshMessage) {
|
||||||
log.info("[accept][收到 Role 与 Menu 的关联刷新消息]");
|
log.info("[execute][收到 Role 与 Menu 的关联刷新消息]");
|
||||||
permissionService.initLocalCache();
|
permissionService.initLocalCache();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.permission.RoleService;
|
import cn.iocoder.yudao.module.system.service.permission.RoleService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 针对 {@link RoleRefreshMessage} 的消费者
|
* 针对 {@link RoleRefreshMessage} 的消费者
|
||||||
|
@ -15,14 +15,14 @@ import java.util.function.Consumer;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RoleRefreshConsumer implements Consumer<RoleRefreshMessage> {
|
public class RoleRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private RoleService roleService;
|
private RoleService roleService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void accept(RoleRefreshMessage message) {
|
public void execute(RoleRefreshMessage message) {
|
||||||
log.info("[accept][收到 Role 刷新消息]");
|
log.info("[execute][收到 Role 刷新消息]");
|
||||||
roleService.initLocalCache();
|
roleService.initLocalCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.permission.PermissionService;
|
import cn.iocoder.yudao.module.system.service.permission.PermissionService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 针对 {@link UserRoleRefreshMessage} 的消费者
|
* 针对 {@link UserRoleRefreshMessage} 的消费者
|
||||||
|
@ -15,14 +15,14 @@ import java.util.function.Consumer;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class UserRoleRefreshConsumer implements Consumer<UserRoleRefreshMessage> {
|
public class UserRoleRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private PermissionService permissionService;
|
private PermissionService permissionService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void accept(UserRoleRefreshMessage userRoleRefreshMessage) {
|
public void execute(UserRoleRefreshMessage userRoleRefreshMessage) {
|
||||||
log.info("[accept][收到 User 与 Role 的关联刷新消息]");
|
log.info("[execute][收到 User 与 Role 的关联刷新消息]");
|
||||||
permissionService.initLocalCache();
|
permissionService.initLocalCache();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.consumer.sensitiveword;
|
package cn.iocoder.yudao.module.system.mq.consumer.sensitiveword;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sensitiveword.SensitiveWordRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sensitiveword.SensitiveWordRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.sensitiveword.SensitiveWordService;
|
import cn.iocoder.yudao.module.system.service.sensitiveword.SensitiveWordService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -15,14 +15,14 @@ import javax.annotation.Resource;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class SensitiveWordRefreshConsumer extends AbstractChannelMessageListener<SensitiveWordRefreshMessage> {
|
public class SensitiveWordRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private SensitiveWordService sensitiveWordService;
|
private SensitiveWordService sensitiveWordService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void onMessage(SensitiveWordRefreshMessage message) {
|
public void execute(SensitiveWordRefreshMessage message) {
|
||||||
log.info("[onMessage][收到 SensitiveWord 刷新消息]");
|
log.info("[execute][收到 SensitiveWord 刷新消息]");
|
||||||
sensitiveWordService.initLocalCache();
|
sensitiveWordService.initLocalCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.consumer.sms;
|
package cn.iocoder.yudao.module.system.mq.consumer.sms;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.service.sms.SmsChannelService;
|
import cn.iocoder.yudao.module.system.service.sms.SmsChannelService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -15,14 +15,14 @@ import javax.annotation.Resource;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class SmsChannelRefreshConsumer extends AbstractChannelMessageListener<SmsChannelRefreshMessage> {
|
public class SmsChannelRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private SmsChannelService smsChannelService;
|
private SmsChannelService smsChannelService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void onMessage(SmsChannelRefreshMessage message) {
|
public void execute(SmsChannelRefreshMessage message) {
|
||||||
log.info("[onMessage][收到 SmsChannel 刷新消息]");
|
log.info("[execute][收到 SmsChannel 刷新消息]");
|
||||||
smsChannelService.initSmsClients();
|
smsChannelService.initSmsClients();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.consumer.sms;
|
package cn.iocoder.yudao.module.system.mq.consumer.sms;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sms.SmsTemplateRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sms.SmsTemplateRefreshMessage;
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
|
||||||
import cn.iocoder.yudao.module.system.service.sms.SmsTemplateService;
|
import cn.iocoder.yudao.module.system.service.sms.SmsTemplateService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -15,14 +15,14 @@ import javax.annotation.Resource;
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class SmsTemplateRefreshConsumer extends AbstractChannelMessageListener<SmsTemplateRefreshMessage> {
|
public class SmsTemplateRefreshConsumer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private SmsTemplateService smsTemplateService;
|
private SmsTemplateService smsTemplateService;
|
||||||
|
|
||||||
@Override
|
@EventListener
|
||||||
public void onMessage(SmsTemplateRefreshMessage message) {
|
public void execute(SmsTemplateRefreshMessage message) {
|
||||||
log.info("[onMessage][收到 SmsTemplate 刷新消息]");
|
log.info("[execute][收到 SmsTemplate 刷新消息]");
|
||||||
smsTemplateService.initLocalCache();
|
smsTemplateService.initLocalCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.auth;
|
package cn.iocoder.yudao.module.system.mq.message.auth;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* OAuth 2.0 客户端的数据刷新 Message
|
* OAuth 2.0 客户端的数据刷新 Message
|
||||||
|
@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode;
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class OAuth2ClientRefreshMessage extends AbstractChannelMessage {
|
public class OAuth2ClientRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
@Override
|
public OAuth2ClientRefreshMessage() {
|
||||||
public String getChannel() {
|
}
|
||||||
return "system.oauth2-client.refresh";
|
|
||||||
|
public OAuth2ClientRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.dept;
|
package cn.iocoder.yudao.module.system.mq.message.dept;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 部门数据刷新 Message
|
* 部门数据刷新 Message
|
||||||
|
@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode;
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class DeptRefreshMessage extends AbstractChannelMessage {
|
public class DeptRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
@Override
|
public DeptRefreshMessage() {
|
||||||
public String getChannel() {
|
}
|
||||||
return "system.dept.refresh";
|
|
||||||
|
public DeptRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,19 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.permission;
|
package cn.iocoder.yudao.module.system.mq.message.permission;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
import lombok.Data;
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 菜单数据刷新 Message
|
* 菜单数据刷新 Message
|
||||||
*
|
*
|
||||||
* @author 芋道源码
|
* @author 芋道源码
|
||||||
*/
|
*/
|
||||||
@Data
|
public class MenuRefreshMessage extends RemoteApplicationEvent {
|
||||||
public class MenuRefreshMessage {
|
|
||||||
|
public MenuRefreshMessage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public MenuRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.permission;
|
package cn.iocoder.yudao.module.system.mq.message.permission;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 角色与菜单数据刷新 Message
|
* 角色与菜单数据刷新 Message
|
||||||
|
@ -10,5 +9,13 @@ import lombok.EqualsAndHashCode;
|
||||||
* @author 芋道源码
|
* @author 芋道源码
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class RoleMenuRefreshMessage {
|
public class RoleMenuRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
|
public RoleMenuRefreshMessage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public RoleMenuRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.permission;
|
package cn.iocoder.yudao.module.system.mq.message.permission;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 角色数据刷新 Message
|
* 角色数据刷新 Message
|
||||||
|
@ -8,5 +9,13 @@ import lombok.Data;
|
||||||
* @author 芋道源码
|
* @author 芋道源码
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class RoleRefreshMessage {
|
public class RoleRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
|
public RoleRefreshMessage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public RoleRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.permission;
|
package cn.iocoder.yudao.module.system.mq.message.permission;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户与角色的数据刷新 Message
|
* 用户与角色的数据刷新 Message
|
||||||
|
@ -8,5 +9,13 @@ import lombok.Data;
|
||||||
* @author 芋道源码
|
* @author 芋道源码
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class UserRoleRefreshMessage {
|
public class UserRoleRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
|
public UserRoleRefreshMessage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public UserRoleRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,19 +1,21 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.sensitiveword;
|
package cn.iocoder.yudao.module.system.mq.message.sensitiveword;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 敏感词的刷新 Message
|
* 敏感词的刷新 Message
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class SensitiveWordRefreshMessage extends AbstractChannelMessage {
|
public class SensitiveWordRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
@Override
|
public SensitiveWordRefreshMessage() {
|
||||||
public String getChannel() {
|
}
|
||||||
return "system.sensitive-word.refresh";
|
|
||||||
|
public SensitiveWordRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.sms;
|
package cn.iocoder.yudao.module.system.mq.message.sms;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 短信渠道的数据刷新 Message
|
* 短信渠道的数据刷新 Message
|
||||||
|
@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode;
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class SmsChannelRefreshMessage extends AbstractChannelMessage {
|
public class SmsChannelRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
@Override
|
public SmsChannelRefreshMessage() {
|
||||||
public String getChannel() {
|
}
|
||||||
return "system.sms-channel.refresh";
|
|
||||||
|
public SmsChannelRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.message.sms;
|
package cn.iocoder.yudao.module.system.mq.message.sms;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 短信模板的数据刷新 Message
|
* 短信模板的数据刷新 Message
|
||||||
|
@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode;
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class SmsTemplateRefreshMessage extends AbstractChannelMessage {
|
public class SmsTemplateRefreshMessage extends RemoteApplicationEvent {
|
||||||
|
|
||||||
@Override
|
public SmsTemplateRefreshMessage() {
|
||||||
public String getChannel() {
|
}
|
||||||
return "system.sms-template.refresh";
|
|
||||||
|
public SmsTemplateRefreshMessage(Object source, String originService, String destinationService) {
|
||||||
|
super(source, originService, destinationService);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,26 +1,20 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.auth;
|
package cn.iocoder.yudao.module.system.mq.producer.auth;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.auth.OAuth2ClientRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.auth.OAuth2ClientRefreshMessage;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* OAuth 2.0 客户端相关消息的 Producer
|
* OAuth 2.0 客户端相关消息的 Producer
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class OAuth2ClientProducer {
|
public class OAuth2ClientProducer extends AbstractBusProducer {
|
||||||
|
|
||||||
@Resource
|
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link OAuth2ClientRefreshMessage} 消息
|
* 发送 {@link OAuth2ClientRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendOAuth2ClientRefreshMessage() {
|
public void sendOAuth2ClientRefreshMessage() {
|
||||||
OAuth2ClientRefreshMessage message = new OAuth2ClientRefreshMessage();
|
publishEvent(new OAuth2ClientRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
redisMQTemplate.send(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,26 +1,20 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.dept;
|
package cn.iocoder.yudao.module.system.mq.producer.dept;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.dept.DeptRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.dept.DeptRefreshMessage;
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dept 部门相关消息的 Producer
|
* Dept 部门相关消息的 Producer
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class DeptProducer {
|
public class DeptProducer extends AbstractBusProducer {
|
||||||
|
|
||||||
@Resource
|
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link DeptRefreshMessage} 消息
|
* 发送 {@link DeptRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendDeptRefreshMessage() {
|
public void sendDeptRefreshMessage() {
|
||||||
DeptRefreshMessage message = new DeptRefreshMessage();
|
publishEvent(new DeptRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
redisMQTemplate.send(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,27 +1,20 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.permission;
|
package cn.iocoder.yudao.module.system.mq.producer.permission;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
|
||||||
import org.springframework.cloud.stream.function.StreamBridge;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Menu 菜单相关消息的 Producer
|
* Menu 菜单相关消息的 Producer
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class MenuProducer {
|
public class MenuProducer extends AbstractBusProducer {
|
||||||
|
|
||||||
@Resource
|
|
||||||
private StreamBridge streamBridge;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link MenuRefreshMessage} 消息
|
* 发送 {@link MenuRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendMenuRefreshMessage() {
|
public void sendMenuRefreshMessage() {
|
||||||
MenuRefreshMessage message = new MenuRefreshMessage();
|
publishEvent(new MenuRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
streamBridge.send("menuRefresh-out-0", message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,36 +1,28 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.permission;
|
package cn.iocoder.yudao.module.system.mq.producer.permission;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage;
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage;
|
||||||
import org.springframework.cloud.stream.function.StreamBridge;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Permission 权限相关消息的 Producer
|
* Permission 权限相关消息的 Producer
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class PermissionProducer {
|
public class PermissionProducer extends AbstractBusProducer {
|
||||||
|
|
||||||
@Resource
|
|
||||||
private StreamBridge streamBridge;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link RoleMenuRefreshMessage} 消息
|
* 发送 {@link RoleMenuRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendRoleMenuRefreshMessage() {
|
public void sendRoleMenuRefreshMessage() {
|
||||||
RoleMenuRefreshMessage message = new RoleMenuRefreshMessage();
|
publishEvent(new RoleMenuRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
streamBridge.send("roleMenuRefresh-out-0", message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link UserRoleRefreshMessage} 消息
|
* 发送 {@link UserRoleRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendUserRoleRefreshMessage() {
|
public void sendUserRoleRefreshMessage() {
|
||||||
UserRoleRefreshMessage message = new UserRoleRefreshMessage();
|
publishEvent(new UserRoleRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
streamBridge.send("userRoleRefresh-out-0", message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +1,22 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.permission;
|
package cn.iocoder.yudao.module.system.mq.producer.permission;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.cloud.stream.function.StreamBridge;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Role 角色相关消息的 Producer
|
* Role 角色相关消息的 Producer
|
||||||
*
|
*
|
||||||
* @author 芋道源码
|
* @author 芋道源码
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class RoleProducer {
|
public class RoleProducer extends AbstractBusProducer {
|
||||||
|
|
||||||
@Resource
|
|
||||||
private StreamBridge streamBridge;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link RoleRefreshMessage} 消息
|
* 发送 {@link RoleRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendRoleRefreshMessage() {
|
public void sendRoleRefreshMessage() {
|
||||||
RoleRefreshMessage message = new RoleRefreshMessage();
|
publishEvent(new RoleRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
streamBridge.send("roleRefresh-out-0", message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,26 +1,21 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.sensitiveword;
|
package cn.iocoder.yudao.module.system.mq.producer.sensitiveword;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sensitiveword.SensitiveWordRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sensitiveword.SensitiveWordRefreshMessage;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 敏感词相关的 Producer
|
* 敏感词相关的 Producer
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class SensitiveWordProducer {
|
public class SensitiveWordProducer extends AbstractBusProducer {{
|
||||||
|
|
||||||
@Resource
|
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
|
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* 发送 {@link SensitiveWordRefreshMessage} 消息
|
* 发送 {@link SensitiveWordRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendSensitiveWordRefreshMessage() {
|
public void sendSensitiveWordRefreshMessage() {
|
||||||
SensitiveWordRefreshMessage message = new SensitiveWordRefreshMessage();
|
publishEvent(new SensitiveWordRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
redisMQTemplate.send(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
package cn.iocoder.yudao.module.system.mq.producer.sms;
|
package cn.iocoder.yudao.module.system.mq.producer.sms;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.common.core.KeyValue;
|
import cn.iocoder.yudao.framework.common.core.KeyValue;
|
||||||
|
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
||||||
|
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
|
||||||
import cn.iocoder.yudao.module.system.mq.message.sms.SmsTemplateRefreshMessage;
|
import cn.iocoder.yudao.module.system.mq.message.sms.SmsTemplateRefreshMessage;
|
||||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -19,7 +20,7 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class SmsProducer {
|
public class SmsProducer extends AbstractBusProducer {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private RedisMQTemplate redisMQTemplate;
|
private RedisMQTemplate redisMQTemplate;
|
||||||
|
@ -28,16 +29,14 @@ public class SmsProducer {
|
||||||
* 发送 {@link SmsChannelRefreshMessage} 消息
|
* 发送 {@link SmsChannelRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendSmsChannelRefreshMessage() {
|
public void sendSmsChannelRefreshMessage() {
|
||||||
SmsChannelRefreshMessage message = new SmsChannelRefreshMessage();
|
publishEvent(new SmsChannelRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
redisMQTemplate.send(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送 {@link SmsTemplateRefreshMessage} 消息
|
* 发送 {@link SmsTemplateRefreshMessage} 消息
|
||||||
*/
|
*/
|
||||||
public void sendSmsTemplateRefreshMessage() {
|
public void sendSmsTemplateRefreshMessage() {
|
||||||
SmsTemplateRefreshMessage message = new SmsTemplateRefreshMessage();
|
publishEvent(new SmsTemplateRefreshMessage(this, getBusId(), selfDestinationService()));
|
||||||
redisMQTemplate.send(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -51,12 +51,13 @@ dubbo:
|
||||||
address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心
|
address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心
|
||||||
|
|
||||||
--- #################### MQ 消息队列相关配置 ####################
|
--- #################### MQ 消息队列相关配置 ####################
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
cloud:
|
cloud:
|
||||||
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
|
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
|
||||||
stream:
|
stream:
|
||||||
function:
|
# function:
|
||||||
definition: roleRefreshConsumer;menuRefreshConsumer;roleMenuRefreshConsumer;userRoleRefreshConsumer;
|
# definition: roleRefreshConsumer;roleMenuRefreshConsumer;userRoleRefreshConsumer;
|
||||||
# Binding 配置项,对应 BindingProperties Map
|
# Binding 配置项,对应 BindingProperties Map
|
||||||
bindings:
|
bindings:
|
||||||
roleRefresh-out-0:
|
roleRefresh-out-0:
|
||||||
|
@ -64,21 +65,6 @@ spring:
|
||||||
roleRefreshConsumer-in-0:
|
roleRefreshConsumer-in-0:
|
||||||
destination: system_role_refresh
|
destination: system_role_refresh
|
||||||
group: system_role_refresh_consumer_group
|
group: system_role_refresh_consumer_group
|
||||||
menuRefresh-out-0:
|
|
||||||
destination: system_menu_refresh
|
|
||||||
menuRefreshConsumer-in-0:
|
|
||||||
destination: system_menu_refresh
|
|
||||||
group: system_menu_refresh_consumer_group
|
|
||||||
roleMenuRefresh-out-0:
|
|
||||||
destination: system_role_menu_refresh
|
|
||||||
roleMenuRefreshConsumer-in-0:
|
|
||||||
destination: system_role_menu_refresh
|
|
||||||
group: system_role_menu_refresh_consumer_group
|
|
||||||
userRoleRefresh-out-0:
|
|
||||||
destination: system_user_role_refresh
|
|
||||||
userRoleRefreshConsumer-in-0:
|
|
||||||
destination: system_user_role_refresh
|
|
||||||
group: system_user_role_refresh_consumer_group
|
|
||||||
# Spring Cloud Stream RocketMQ 配置项
|
# Spring Cloud Stream RocketMQ 配置项
|
||||||
rocketmq:
|
rocketmq:
|
||||||
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
|
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
|
||||||
|
@ -88,20 +74,12 @@ spring:
|
||||||
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
|
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
|
||||||
group: system_producer_group # 生产者分组
|
group: system_producer_group # 生产者分组
|
||||||
send-type: SYNC # 发送模式,SYNC 同步
|
send-type: SYNC # 发送模式,SYNC 同步
|
||||||
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
|
|
||||||
bindings:
|
# Spring Cloud Bus 配置项,对应 BusProperties 类
|
||||||
roleRefreshConsumer-in-0:
|
bus:
|
||||||
consumer:
|
enabled: true # 是否开启,默认为 true
|
||||||
message-model: BROADCASTING # 广播消费
|
id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibaba 建议使用“应用:端口”的格式
|
||||||
menuRefreshConsumer-in-0:
|
destination: springCloudBus2 # 目标消息队列,默认为 springCloudBus
|
||||||
consumer:
|
|
||||||
message-model: BROADCASTING # 广播消费
|
|
||||||
roleMenuRefreshConsumer-in-0:
|
|
||||||
consumer:
|
|
||||||
message-model: BROADCASTING # 广播消费
|
|
||||||
userRoleRefreshConsumer-in-0:
|
|
||||||
consumer:
|
|
||||||
message-model: BROADCASTING # 广播消费
|
|
||||||
|
|
||||||
--- #################### 芋道相关配置 ####################
|
--- #################### 芋道相关配置 ####################
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue