⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.csdn.net/chenping1993/article/details/114580954 「龙池小生」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

这里介绍一下RabbitMQ重复消费的场景,以及如何解决消息重复消费的问题。

注:本文只做粗略逻辑实现借鉴,实际业务场景需根据实际情况再做细化处理。

消息重复消费

什么是消息重复消费?

首先我们来看一下消息的传输流程。消息生产者-->MQ-->消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段:

1、生产者多发送了消息给MQ;

2、MQ的一条消息被消费者消费了多次。

第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。

我们着重来看一下第二个场景。

MQ的一条消息被消费者消费了多次

在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

重复消费场景重现测试

1、消息发送者发送1万条消息给MQ

@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {
String message = "server message sendToClient";
for (int i = 0; i < 10000; i++) {
amqpTemplate.convertAndSend("queueName3",message+": "+i);

}
return message;
}

启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息

@RabbitListener(queues = "queueName3")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("接收者2--接收到queueName3队列的消息为:"+message);
}

启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

如何解决消息重复消费的问题

为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下:

  • 消费者监听到消息后获取id,先去查询这个id是否存中
  • 如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)
  • 如果存在则丢弃此消息

编码

消息生产者服务:

/**
* @Description: 发送消息 模拟消息重复消费
* 消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
* 消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
* 为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
* @param:
* @return: java.lang.String
* @Author: chenping
*/
@GetMapping("/rabbitmq/sendMsgNoRepeat")
public String sendMsgNoRepeat() {
String message = "server message sendMsgNoRepeat";
for (int i = 0; i <10000 ; i++) {
Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
amqpTemplate.convertAndSend("queueName4",msg);
}
return message;
}

消息消费者服务:

方案1:将id存入string中(单消费者场景):

这样一个队列,redis数据只有一条,每次消息过来都覆盖之前的消息,但是消费者多的情况不适用,可能会存在问题--一个消息被多个消费者消费

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");

String messageRedisValue = redisUtil.get("queueName4","");
if (messageRedisValue.equals(messageId)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);

redisUtil.set("queueName4",messageId);//以队列为key,id为value
}

方案2:将id存入list中(多消费者场景)

这个方案可以解决多消费者的问题,但是随着mq的消息增加,redis数据越来越多,需要去清除redis数据。

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");

List<String> messageRedisValue = redisUtil.lrange("queueName4");
if (messageRedisValue.contains(messageId)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);

redisUtil.lpush("queueName4",messageId);//存入list
}

方案3:将id以key值增量存入string中并设置过期时间:

以消息id为key,消息内容为value存入string中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");

String messageRedisValue = redisUtil.get(messageId,"");
if (msg.equals(messageRedisValue)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);

redisUtil.set(messageId,msg,10L);//以id为key,消息内容为value,过期时间10分钟
}

解决消息重复消费测试:

首先,启动消息生成服务,发送一万条消息:

启动消息消费服务,然后中断服务,消费了1934条消息:

查看未被消费的消息条数为8067条,多了一条(10000-1934=8066 ):

再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

文章目录
  1. 1. 消息重复消费
    1. 1.1. MQ的一条消息被消费者消费了多次
    2. 1.2. 重复消费场景重现测试
  2. 2. 如何解决消息重复消费的问题
    1. 2.1. 编码
    2. 2.2. 解决消息重复消费测试: