Skip to content

RocketMQ

项目地址:https://github.com/zhaobao1830/misszb/TestController

文档地址:https://rocketmq.apache.org/zh/

随笔

项目中使用的技术越多,维护成本越高

消息队列可以非常强的提升系统的高并发能力

如果消息队列能满足,就不需要上分布式,分布式的维护成本更高

消息队列的使用场景:

1、中间件,连接A、B俩个模块,A模块操作完数据后,将数据推到消息队列里,B从消息队列获取数据

2、异步 消息队列可以将业务逻辑处理由同步变成异步 比如下单->成功 其中会有验证、短信、邮件、仓储调度等逻辑,如果是同步,用户就得一直等着。我们可以把验证、短信、邮件、仓储调度当作一个个消息放到消息队列中,之后从消息队列里取出来进行操作

消息队列可以把处理请求的时间拉长,减轻服务器的压力

消息队列

常用的消息队列有Kafka、RocketMQ、RabbitMQ、ActiveMQ,最常用的是前面的三种

因为当前要实现的是订单过期功能,需要用到延迟消息队列,所以选择使用RocketMQ

延迟消息队列:可以理解为定时器,时间到了,就自动执行,将消息推送给服务器,执行之后的逻辑

代码

在springBoot里使用RocketMQ

1、pom.xml里安装

xml
		<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.7.0</version>
		</dependency>

2、TestController里新加调用方法

java

    @Autowired
    private ProducerSchedule producerSchedule;
    
    @RequestMapping(value = "/push", method = RequestMethod.GET)
    public void pushMessageToMQ() throws Exception {
        producerSchedule.send("TopicTest", "test");
    }

3、application-dev.yml添加配置类

yml
rocketmq:
  consumer:
    consumer-group: SleeveConsumerGroup
  producer:
    producer-group: SleeveProducerGroup
  namesrv-addr: 127.0.0.1:9876

4、ProducerSchedule初始化rocketmq,用来发送消息

java
package com.zb.misszb.manager.rocketmq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

// 操作rocketmq
@Component
public class ProducerSchedule {
    private DefaultMQProducer producer;

    @Value("${rocketmq.producer.producer-group}")
    private String producerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    public ProducerSchedule() {}

    // 用来初始化DefaultMQProducer,并传入值
    // 使用PostConstruct注解:是因为当前类的对象生成的时候,需要用到配置文件里的数据,但一开始数据为空,加上PostConstruct注解,可以保证
    // 配置文件里的数据注入到当前类
    @PostConstruct
    public void defaultMQProducer() {
        if (this.producer == null) {
            this.producer = new DefaultMQProducer(this.producerGroup);
            this.producer.setNamesrvAddr(this.namesrvAddr);
        }
        try {
            this.producer.start();
            System.out.println("-------producer start");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public String send(String topic, String messageText) throws Exception {
        Message message = new Message(topic, messageText.getBytes());
//      messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        message.setDelayTimeLevel(4);

        SendResult result = this.producer.send(message);
        System.out.println(result.getMsgId());
        System.out.println(result.getSendStatus());
        return result.getMsgId();
    }
}

5、ConsumerSchedule 用来接收消息

java
/**
 * @作者 7七月
 * @微信公号 林间有风
 * @开源项目 $ http://talelin.com
 * @免费专栏 $ http://course.talelin.com
 * @我的课程 $ http://imooc.com/t/4294850
 * @创建时间 2020-06-18 10:54
 */
package com.zb.misszb.manager.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;

//@Component
public class ConsumerSchedule implements CommandLineRunner {

    @Value("${rocketmq.consumer.consumer-group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    public void messageListener() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        consumer.setNamesrvAddr(namesrvAddr);

        consumer.subscribe("TopicTest", "*");

        consumer.setConsumeMessageBatchMaxSize(1);

        consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
            for (Message message : messages) {
                System.out.println("消息:" + new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
    }


    @Override
    public void run(String... args) throws Exception {
        this.messageListener();
    }
}

如有转载或 CV 的请标注本站原文地址