RocketMQ定时发送

2020-07-17

目前国内用的较多的消息列对就是RocketMQ,使用简单,可以直接在阿里云上边购买,并且是阿里巴巴及双十一官方指定消息产品。

简介:消息队列/消息中间件
在消息传输过程中保存消息的容器(本质是暂存数据的容器)

主要用途:

  • 解耦:A系统发送数据到BCD三个系统,如果通过接口调用方式发送,那么他们的耦合度非常高,A会变的非常难以维护,如果BCD三个系统某一个发生故障,A也会受影响。
    这时候就可以引入MQ进行解耦------------------>A系统发送数据到MQ,BCD订阅MQ的数据进行处理,当B系统不需要再处理数据的时候,只需要取消MQ的订阅即可,当新增E系统需要处理数据的时候只需要订阅MQ即可,无论哪种方式都不用再对A服务进行修改,如果BCD中某一个系统发生了故障,A也不会受影响。
  • 异步:解耦前A系统消耗的时间是A+B+C+D
    解耦后A系统消耗的时间是A+投放数据到消息队列的时间,BCD在后台同时执行。
  • 削峰:高峰期的大量请求先放入MQ暂存,再由A系统慢慢读取处理。
    常见的消息列队有:RocketMQ,RabbitMQ,kafka

特点:1.高吞吐量,低延时:每秒可以处理几十万条数据,延时在几毫秒内。
2.可扩展(扩容),支持热扩展。
3.可靠性高,有备份机制保障数据不丢失。
4.高并发,支持数千客户端同时读写和操作。
| topic| 主题,用于区分不同来源的数据 |
| --- | --- |
| producer | 生产者,向MQ中发送数据的程序 |
| customer | 消费者,从MQ中读取数据的程序 |
| Group | 组 |
| Tag | 标签,对topic进一步细化 |
| message | 消息的载体,一个Message必须指定一个topic相当于寄信的地址 |

在阿里云上购买RocketMQ,买按量收费的就行

购买完成后,进入消息列对RocketMQ服务点击这里
image.png
接下来点击继续使用
image.png
然后申请
image.png
接下来点击这里,拿到TCP公网接入点
image.png

然后配置生产者

首先导入依赖

        <!-- 增加阿里云RocketMQ依赖 -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.0.Final</version>
        </dependency>

创建对应的环境application.properties文件配置参数

#配置rocketmq
#这两个参数为,上边那几张图片申请和拿到的
rocketmq.accessKey=jgoejgeog
rocketmq.secretKey=jijihohofege
#这个地址为拿到的TCP公网接入点
rocketmq.nameSrvAddr=jijijhihihii
#主题名字
rocketmq.topic=pin_gou
#组名
rocketmq.groupId=GID_12356
rocketmq.tag=*
#定时,延时消息
rocketmq.timeTopic=pin_gou
rocketmq.timeGroupId=GID_123456
rocketmq.timeTag=*

封装MQ配置类:MqConfig

package com.laozhang.config;


import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * @author zhangfan
 * @date 2020/5/14
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;
    private String timeTopic;
    private String timeGroupId;
    private String timeTag;
    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        //设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "4000");
        return properties;
    }
}

给消息生产者注入配置信息,ProducerBean用于将Producer集成至Spring Bean中

package com.laozhang.config;

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 给消息生产者注入配置信息,ProducerBean用于将Producer集成至Spring Bean中
 * @author zhangfan
 * @date 2020/5/14
 */
@Configuration
public class ProducerClient {
    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        //ProducerBean用于将Producer集成至Spring Bean中
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}

为了方便使用,我封装了一个发送消息的类,消息的Message参数和配置,看代码注释,很容易理解

package com.laozhang.controller.pingoutest;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.laozhang.config.MqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author zhangfan
 * @date 2020/5/14
 */
@Component
@RestController
public class PinGou {

    private Logger logger = LoggerFactory.getLogger(PinGou.class);

    @Autowired
    private MqConfig config;

    @Autowired
    private ProducerBean producer;

    /**
     * 同步发送定时/延时消息
     * @param msgTag 标签,可用于消息小分类标注,对消息进行再归类
     * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据
     * @param msgKey 消息key值,建议设置全局唯一值,可不设置,不影响消息收发
     * @param delayTime 服务端发送消息时间,立即发送输入0或比更早的时间
     * @return success:SendResult or error:null
     */
    public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
        Message msg = new Message(config.getTimeTopic(),msgTag,msgKey,messageBody);
        msg.setStartDeliverTime(delayTime);
        return this.send(msg,Boolean.FALSE);
    }

    /**
     * 普通消息发送发放
     * @param msg 消息
     * @param isOneWay 是否单向发送
     */
    private SendResult send(Message msg,Boolean isOneWay) {
        try {
            if(isOneWay) {
                //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                //若数据不可丢,建议选用同步或异步发送方式。
                producer.sendOneway(msg);
                success(msg, "单向消息MsgId不返回");
                return null;
            }else {
                //可靠同步发送
                SendResult sendResult = producer.send(msg);
                //获取发送结果,不抛异常即发送成功
                if (sendResult != null) {
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                }else {
                    error(msg,null);
                    return null;
                }
            }
        } catch (Exception e) {
            error(msg,e);
            return null;
        }
    }

    //--------------日志打印----------
    private void error(Message msg,Exception e) {
        logger.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
        logger.error("errorMsg --- {}",e.getMessage());
    }
    private void success(Message msg,String messageId) {
        logger.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
    }
}

前面已经配置好了将Producer集成至Spring Bean中,直接注入Producer,在业务系统需要的地方调用来发送消息即可

package com.laozhang.controller.pingoutest;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.laozhang.config.MqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author zhangfan
 * @date 2020/5/14
 */
@Component
@RestController
public class PinGou {

    private Logger logger = LoggerFactory.getLogger(PinGou.class);

    @Autowired
    private MqConfig config;

    @Autowired
    private ProducerBean producer;

    /**
     * 演示方法,可在自己的业务系统方法中进行发送消息
     */
    @GetMapping("/test")
    public String mqTest() {
        /*  使用前面封装的方法,传入对应的参数即可发送消息
         *  msgTag 标签,可用于消息小分类标注
         *  messageBody 消息body内容,生产者自定义内容,任何二进制数据,生产者和消费者协定数据的序列化和反序列化
         *  msgKey 消息key值,建议设置全局唯一,比如订单号,用户id这种,可不传,不影响消息投递
         */
        //body内容自定义
        JSONObject body = new JSONObject();
        body.put("userId", "this is userId");
        body.put("notice", "同步消息");
        //同步发送消息
//        sendMsg("userMessage", body.toJSONString().getBytes(), "messageId");
//        //单向消息
//        sendOneWayMsg("userMessage", "单向消息".getBytes(), "messageId");
        //定时/延时消息,当前时间的30秒后推送。时间自己定义
        sendTimeMsg("userMessage", "{id:1,name:\"张三\"}".getBytes(), "{id:1,name:\"张三\"}", System.currentTimeMillis()+30000);
        return "ok";
    }


    /**
     * 同步发送定时/延时消息
     * @param msgTag 标签,可用于消息小分类标注,对消息进行再归类
     * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据
     * @param msgKey 消息key值,建议设置全局唯一值,可不设置,不影响消息收发
     * @param delayTime 服务端发送消息时间,立即发送输入0或比更早的时间
     * @return success:SendResult or error:null
     */
    public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
        Message msg = new Message(config.getTimeTopic(),msgTag,msgKey,messageBody);
        msg.setStartDeliverTime(delayTime);
        return this.send(msg,Boolean.FALSE);
    }

    /**
     * 普通消息发送发放
     * @param msg 消息
     * @param isOneWay 是否单向发送
     */
    private SendResult send(Message msg,Boolean isOneWay) {
        try {
            if(isOneWay) {
                //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                //若数据不可丢,建议选用同步或异步发送方式。
                producer.sendOneway(msg);
                success(msg, "单向消息MsgId不返回");
                return null;
            }else {
                //可靠同步发送
                SendResult sendResult = producer.send(msg);
                //获取发送结果,不抛异常即发送成功
                if (sendResult != null) {
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                }else {
                    error(msg,null);
                    return null;
                }
            }
        } catch (Exception e) {
            error(msg,e);
            return null;
        }
    }

    //--------------日志打印----------
    private void error(Message msg,Exception e) {
        logger.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
        logger.error("errorMsg --- {}",e.getMessage());
    }
    private void success(Message msg,String messageId) {
        logger.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
    }
}

现在生产者已经配置好了,我们跑一下这个接口,会发现他会把数据存储到RocketMQ中

image.png

接下来配置消费者

如果是在别的服务中配置消费者的话:就同样需要在那个服务中添加jar包ons-client v1.8.0.Final,配置mq参数链接(mq消费者配置的参数要和生产者一样) ,添加MqConfig(上面有)

接下来,注入配置,订阅消息,添加消息处理的方法

package com.laozhang.config;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.laozhang.utils.MqTimeMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author zhangfan
 * @date 2020/5/14
 */
@Configuration
public class ConsumerClient {
    @Autowired
    private MqConfig mqConfig;

    //普通消息监听器,Consumer注册消息监听器来订阅消息.
    @Autowired
    private MessageListener messageListener;

    //定时消息监听器,Consumer注册消息监听器来订阅消息.
    @Autowired
    private MqTimeMessageListener timeMessageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        //订阅普通消息
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.setExpression(mqConfig.getTag());
        subscriptionTable.put(subscription, messageListener);
        //订阅定时/延时消息
        Subscription subscriptionTime = new Subscription();
        subscriptionTime.setTopic(mqConfig.getTimeTopic());
        subscriptionTime.setExpression(mqConfig.getTimeTag());
        subscriptionTable.put(subscriptionTime, timeMessageListener);

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

}

对定时/延时消息监听类进行实现,处理接收到的消息

package com.laozhang.utils;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 定时/延时MQ消息监听消费
 * @author zhangfan
 * @date 2020/5/14
 */
@Component
public class MqTimeMessageListener implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    //实现MessageListtener监听器的消费方法
    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), new String(message.getBody()));

        try {
            String msgTag = message.getTag();//消息类型
            String msgKey = message.getKey();//业务唯一id
            System.out.println("msgKey:" +  msgKey);
            switch (msgTag) {
                //----通过生产者传的tag标签进行消息分类和过滤处理
                case "userMessage":
                    //通过唯一key的,比如前面key传的值是订单号或者用户id这种唯一值,来进行数据的查询或处理
                    //由于RocketMQ能重复推送消息,处理消息的时候做好数据的幂等,防止重复处理
//                    if(//如订单系统需要判断订单是否被处理过等,通过传的msgKey即订单号去查询数据库进行判断{}
                    break;
            }
            //验证通过,处理业务
            //do something
            //消费成功,继续消费下一条消息
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error("消费MQ消息失败! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
            //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
            return Action.ReconsumeLater;
        }
    }
}

最后运行即可


标题:RocketMQ定时发送
作者:张范
地址:http://misterzhang.top/articles/2020/05/14/1589453157176.html