RocketMQ
项目地址:https://github.com/zhaobao1830/misszb/TestController
文档地址:https://rocketmq.apache.org/zh/
随笔
项目中使用的技术越多,维护成本越高
消息队列可以非常强的提升系统的高并发能力
如果消息队列能满足,就不需要上分布式,分布式的维护成本更高
消息队列的使用场景:
1、中间件,连接A、B俩个模块,A模块操作完数据后,将数据推到消息队列里,B从消息队列获取数据
2、异步 消息队列可以将业务逻辑处理由同步变成异步 比如下单->成功 其中会有验证、短信、邮件、仓储调度等逻辑,如果是同步,用户就得一直等着。我们可以把验证、短信、邮件、仓储调度当作一个个消息放到消息队列中,之后从消息队列里取出来进行操作
消息队列可以把处理请求的时间拉长,减轻服务器的压力
消息队列
常用的消息队列有Kafka、RocketMQ、RabbitMQ、ActiveMQ,最常用的是前面的三种
因为当前要实现的是订单过期功能,需要用到延迟消息队列,所以选择使用RocketMQ
延迟消息队列:可以理解为定时器,时间到了,就自动执行,将消息推送给服务器,执行之后的逻辑
代码
在springBoot里使用RocketMQ
1、pom.xml里安装
xml
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
2、TestController里新加调用方法
java
@Autowired
private ProducerSchedule producerSchedule;
@RequestMapping(value = "/push", method = RequestMethod.GET)
public void pushMessageToMQ() throws Exception {
producerSchedule.send("TopicTest", "test");
}
3、application-dev.yml添加配置类
yml
rocketmq:
consumer:
consumer-group: SleeveConsumerGroup
producer:
producer-group: SleeveProducerGroup
namesrv-addr: 127.0.0.1:9876
4、ProducerSchedule初始化rocketmq,用来发送消息
java
package com.zb.misszb.manager.rocketmq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
// 操作rocketmq
@Component
public class ProducerSchedule {
private DefaultMQProducer producer;
@Value("${rocketmq.producer.producer-group}")
private String producerGroup;
@Value("${rocketmq.namesrv-addr}")
private String namesrvAddr;
public ProducerSchedule() {}
// 用来初始化DefaultMQProducer,并传入值
// 使用PostConstruct注解:是因为当前类的对象生成的时候,需要用到配置文件里的数据,但一开始数据为空,加上PostConstruct注解,可以保证
// 配置文件里的数据注入到当前类
@PostConstruct
public void defaultMQProducer() {
if (this.producer == null) {
this.producer = new DefaultMQProducer(this.producerGroup);
this.producer.setNamesrvAddr(this.namesrvAddr);
}
try {
this.producer.start();
System.out.println("-------producer start");
} catch (MQClientException e) {
e.printStackTrace();
}
}
public String send(String topic, String messageText) throws Exception {
Message message = new Message(topic, messageText.getBytes());
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(4);
SendResult result = this.producer.send(message);
System.out.println(result.getMsgId());
System.out.println(result.getSendStatus());
return result.getMsgId();
}
}
5、ConsumerSchedule 用来接收消息
java
/**
* @作者 7七月
* @微信公号 林间有风
* @开源项目 $ http://talelin.com
* @免费专栏 $ http://course.talelin.com
* @我的课程 $ http://imooc.com/t/4294850
* @创建时间 2020-06-18 10:54
*/
package com.zb.misszb.manager.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
//@Component
public class ConsumerSchedule implements CommandLineRunner {
@Value("${rocketmq.consumer.consumer-group}")
private String consumerGroup;
@Value("${rocketmq.namesrv-addr}")
private String namesrvAddr;
public void messageListener() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (Message message : messages) {
System.out.println("消息:" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
@Override
public void run(String... args) throws Exception {
this.messageListener();
}
}