update mq

This commit is contained in:
shuzheng 2016-12-08 17:59:50 +08:00
parent 1cd7c94a5d
commit a646121b1a
10 changed files with 176 additions and 16 deletions

View File

@ -11,6 +11,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* MQ消费者
@ -35,8 +37,13 @@ public class defaultQueueMessageListener implements MessageListener {
String text = textMessage.getText();
JSONObject json = JSONObject.fromObject(text);
User user = (User) JSONObject.toBean(json, User.class);
if (user.getUsername().equals("1")) {
_log.info("消费开始时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
}
if (user.getUsername().equals("1000")) {
_log.info("消费结束时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
}
userService.getMapper().insertSelective(user);
_log.info("zheng-cms-mq接收到{}", text);
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -18,6 +18,6 @@ log4j.logger.druid.sql.Statement=warn,stdout
log4j.logger.druid.sql.ResultSet=warn,stdout
# MyBatis logging configuration
log4j.logger.com.zheng.cms.dao.mapper=debug
log4j.logger.com.zheng.cms.dao.mapper=error
#log4j.logger.com.zheng.cms.dao.mapper.UserMapper=debug
#log4j.logger.com.zheng.cms.dao.mapper.UserMapper.selectUser=debug

View File

@ -40,6 +40,12 @@
<systemPath>${project.basedir}/src/main/webapp/WEB-INF/lib/tbschedule-3.2.8-SNAPSHOT.jar
</systemPath>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.6.2.Final</version>
</dependency>
</dependencies>
<profiles>

View File

@ -37,7 +37,7 @@ public class ActiveMQController extends BaseController {
User user = null;
for (int i = 1; i <= 1000; i ++) {
user = new User();
user.setUsername("用户" + i);
user.setUsername(i + "");
user.setPassword("123456");
user.setNickname("昵称");
user.setSex(1);

View File

@ -0,0 +1,56 @@
package com.zheng.cms.web.jms;
import com.zheng.cms.dao.model.User;
import com.zheng.cms.service.UserService;
import net.sf.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Created by ZhangShuzheng on 2016/12/8.
*/
@Component
public class MessageListener extends MessageListenerAdapter {
private static Logger _log = LoggerFactory.getLogger(defaultQueueMessageListener.class);
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
UserService userService;
@JmsListener(containerFactory = "connectionFactory", destination = "defaultQueueDestination")
public void processOrder(final Message message) {
// 使用线程池多线程处理
threadPoolTaskExecutor.execute(new Runnable() {
public void run() {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
JSONObject json = JSONObject.fromObject(text);
User user = (User) JSONObject.toBean(json, User.class);
if (user.getUsername().equals("1")) {
_log.info("消费开始时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
}
if (user.getUsername().equals("1000")) {
_log.info("消费结束时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
}
userService.getMapper().insertSelective(user);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

View File

@ -11,6 +11,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* MQ消费者
@ -35,8 +37,13 @@ public class defaultQueueMessageListener implements MessageListener {
String text = textMessage.getText();
JSONObject json = JSONObject.fromObject(text);
User user = (User) JSONObject.toBean(json, User.class);
if (user.getUsername().equals("1")) {
_log.info("消费开始时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
}
if (user.getUsername().equals("1000")) {
_log.info("消费结束时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
}
userService.getMapper().insertSelective(user);
_log.info("zheng-cms-web接收到{}", text);
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -0,0 +1,45 @@
package com.zheng.cms.web.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* Created by ZhangShuzheng on 2016/12/8.
*/
public class Consumer {
public static void main(String[] args){
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,32 @@
package com.zheng.cms.web.rocketmq;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
/**
* Created by ZhangShuzheng on 2016/12/8.
*/
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("127.0.0.1:9876");
try {
producer.start();
long time = System.currentTimeMillis();
System.out.println("开始:" + time);
for (int i = 1; i <= 100000; i ++) {
Message msg = new Message("PushTopic", "push", i + "", "Just for test.".getBytes());
SendResult result = producer.send(msg);
//System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
}
System.out.println("结束,消耗:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}

View File

@ -1,9 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
default-autowire="byName">
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 连接工厂 -->
<bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
@ -35,12 +41,13 @@
</bean>
<!-- 消费者 -->
<bean id="defaultQueueMessageListener" class="com.zheng.cms.web.jms.defaultQueueMessageListener"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="defaultQueueDestination"/>
<property name="messageListener" ref="defaultQueueMessageListener"/>
<!--<property name="concurrency" value="4-10"/>-->
</bean>
<!--<bean id="defaultQueueMessageListener" class="com.zheng.cms.web.jms.defaultQueueMessageListener"/>-->
<!--<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">-->
<!--<property name="connectionFactory" ref="connectionFactory"/>-->
<!--<property name="destination" ref="defaultQueueDestination"/>-->
<!--<property name="messageListener" ref="defaultQueueMessageListener"/>-->
<!--&lt;!&ndash;<property name="concurrency" value="4-10"/>&ndash;&gt;-->
<!--</bean>-->
<jms:annotation-driven/>
</beans>

View File

@ -18,6 +18,6 @@ log4j.logger.druid.sql.Statement=warn,stdout
log4j.logger.druid.sql.ResultSet=warn,stdout
# MyBatis logging configuration
log4j.logger.com.zheng.cms.dao.mapper=debug
log4j.logger.com.zheng.cms.dao.mapper=error
#log4j.logger.com.zheng.cms.dao.mapper.UserMapper=debug
#log4j.logger.com.zheng.cms.dao.mapper.UserMapper.selectUser=debug