⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 writing-bugs.blog.csdn.net/article/details/103701101 「专注写bug」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

一、前言

前几天我研究了关于springboot整合简单消息队列,实现springboot推送消息至队列中,消费者成功消费。同时也加了消息转发器,对消息转发器各种类型的配置等做了总结。

但是,主要还有一点,我一直存在疑问:如何确保消息成功被消费者消费?

说到这里,我相信很多人会说使用ack啊,关闭队列自动删除啊什么的。主要是道理大家都懂,我要实际的代码,网上找了半天,和我设想的有很大差异,还是自己做研究总结吧。

二、准备

本次写案例,就按照最简单的方式,direct方式进行配置吧,实际流程如下所示:

  • 消息转发器类型: direct直连方式。
  • 消息队列: 暂时采取公平分发方式。
  • 实现流程: 消息生产者生产的消息发送至队列中,由两个消费者获取并消费,消费完成后,清楚消息队列中的消息。

所以我们接下来先写配置和demo。

2.1、依赖引入

再一般的springboot 2.1.4项目中,添加一个pom依赖。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2、连接yml的配置

我们这边暂时只有一个rabbitmq,所以连接操作,基本rabbitmq的信息配置问题直接再yml中编写就可以了。

spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xiangjiao
password: bunana
virtual-host: /xiangjiao
publisher-confirms: true #开启发送确认
publisher-returns: true #开启发送失败回退

#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual #采取手动应答
#concurrency: 1 # 指定最小的消费者数量
#max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试

2.3、config注入配置

我们根据图示

知道我们必须配置以下东西:

  • 一个消息转发器,我们取名directExchangeTx
  • 一个消息队列,取名directQueueTx,并将其绑定至指定的消息转发器上。

所以我们的配置文件需要这么写:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 直连交换机,发送指定队列信息,但这个队列后有两个消费者同时进行消费
* @author 7651
*
*/
@Configuration
public class DirectExchangeTxQueueConfig {

@Bean(name="getDirectExchangeTx")
public DirectExchange getDirectExchangeTx(){
return new DirectExchange("directExchangeTx", true, false);
}

@Bean(name="getQueueTx")
public Queue getQueueTx(){
return new Queue("directQueueTx", true, false, false);
}

@Bean
public Binding getDirectExchangeQueueTx(
@Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx,
@Qualifier(value="getQueueTx") Queue getQueueTx){
return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey");
}
}

2.4、消费者的配置

有了队列和消息转发器,消息当然需要去消费啊,所以我们接下来配置消息消费者。

从图中,我们看出,我们需要配置两个消息消费者,同时监听一个队列,所以我们的配置类为:

消费者一:

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="directQueueTx")
public class Consumer1 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*1);
} catch (InterruptedException e) {
e.printStackTrace();
}


try {
/**
* 确认一条消息:<br>
* channel.basicAck(deliveryTag, false); <br>
* deliveryTag:该消息的index <br>
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br>
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg1 success msg = "+msg);

} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
/**
* 拒绝确认消息:<br>
* channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
* deliveryTag:该消息的index<br>
* multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。<br>
* requeue:被拒绝的是否重新入队列 <br>
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg1 failed msg = "+msg);

/**
* 拒绝一条消息:<br>
* channel.basicReject(long deliveryTag, boolean requeue);<br>
* deliveryTag:该消息的index<br>
* requeue:被拒绝的是否重新入队列
*/
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}

消息消费者二:

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}


try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 success msg = "+msg);

} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg2 failed msg = "+msg);
}
}
}

两个消费者之间唯一的区别在于两者获取消息后,延迟时间不一致。

2.5、消息生产者

有了消息消费者,我们需要有一个方式提供消息并将消息推送到消息队列中。

public interface IMessageServcie {
public void sendMessage(String exchange,String routingKey,Object msg);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.linkpower.service.IMessageServcie;

@Component
public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback {

private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);

@Autowired
private RabbitTemplate rabbitTemplate;

@Override
public void sendMessage(String exchange,String routingKey,Object msg) {
//消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
//消息消费者确认收到消息后,手动ack回执
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//发送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause));
log.info("correlationData -->"+correlationData.toString());
if(ack){
log.info("---- confirm ----ack==true cause="+cause);
}else{
log.info("---- confirm ----ack==false cause="+cause);
}
}

}

除了定义好了消息发送的工具服务接口外,我们还需要一个类,实现请求时产生消息,所以我们写一个controller。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.linkpower.service.IMessageServcie;

@Controller
public class SendMessageTx {

@Autowired
private IMessageServcie messageServiceImpl;

@RequestMapping("/sendMoreMsgTx")
@ResponseBody
public String sendMoreMsgTx(){
//发送10条消息
for (int i = 0; i < 10; i++) {
String msg = "msg"+i;
System.out.println("发送消息 msg:"+msg);
messageServiceImpl.sendMessage("directExchangeTx", "directQueueTxRoutingKey", msg);
//每两秒发送一次
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "send ok";
}
}

运行springboot项目,访问指定的url,是可以观察到消息产生和消费的。

有些人会问,写到这里就够了吗,你这和之前博客相比,和没写一样啊,都是教我们如何配置,如何生产消息,如何消费消息。

所以接下来的才是重点了,我们一起研究一个事,当我们配置的消费者二出现消费消息时,出问题了,你如何能够保证像之前那样,消费者一处理剩下的消息?

三、ack配置和测试

3.1、模拟消费者二出问题

我们发送的消息格式都是 msg1、msg2、…

所以,我们不妨这么想,当我消费者二拿到的消息msg后面的数字大于3,表示我不要了。

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}



try {
if(!isNull(msg)){
String numstr = msg.substring(3);
Integer num = Integer.parseInt(numstr);
if(num >= 3){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.out.println("get msg2 basicNack msg = "+msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 basicAck msg = "+msg);
}
}
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg2 failed msg = "+msg);
}
}

public static boolean isNull(Object obj){
return obj == null || obj == ""||obj == "null";
}
}

再次请求接口,我们统计日志信息打印发现:

发现:

当我们对消息者二进行限制大于等于3时,不接受消息队列传递来的消息时,消息队列会随机重发那条消息,直至消息发送至完好的消费者一时,才会把消息消费掉。

四、分析几个回执方法

4.1、确认消息

channel.basicAck(long deliveryTag, boolean multiple);

我们一般使用下列方式:

channel.basicAck(
message.getMessageProperties().getDeliveryTag(),
false);

4.2、拒绝消息

channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ;

我们接下来还是修改消费者二,将这个方法最后个参数更改为false,看现象是什么?

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}



try {
if(!isNull(msg)){
String numstr = msg.substring(3);
Integer num = Integer.parseInt(numstr);
if(num >= 3){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);
System.out.println("get msg2 basicNack msg = "+msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 basicAck msg = "+msg);
}
}
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg2 failed msg = "+msg);
}
}

public static boolean isNull(Object obj){
return obj == null || obj == ""||obj == "null";
}
}

重启项目,重新请求测试接口。

发现,当出现设置参数为false时,也就是如下所示的设置时:

channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
false);

如果此时消费者二出了问题,这条消息不会重新回归队列中重新发送,会丢失这条数据。

并且再消息队列中不会保存:

4.3、拒绝消息

channel.basicReject(long deliveryTag, boolean requeue);

这个和上面的channel.basicNack又有什么不同呢?我们还是修改消费者二实验下。

请求测试接口,查看日志信息。

发现,此时的日志信息配置

channel.basicReject(
message.getMessageProperties().getDeliveryTag(),
true);

channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false, true);

实现的效果是一样的,都是将信息拒绝接收,由于设置的requeue为true,所以都会将拒绝的消息重新入队列中,重新进行消息分配并消费。

五、总结

这一篇博客,我们总结了相关的配置,三个确认(或回执)信息的方法,并区别了他们的各项属性,也知道了当消息再一个消费者中处理失败了,如何不丢失消息重新进行消息的分配消费问题。

但是这个只是队列和消费者之间的消息确认机制,使用手动ACK方式确保消息队列中的消息都能在消费者中成功消费。那么,消息转发器和消息队列之间呢?消息生产者和消息转发器之间呢?

当然,差点忘了一个小问题。

我们思考一个问题,如果消息队列对应的消费者只有一个,并且那个消费者炸了,会出现什么问题呢??

文章目录
  1. 1. 一、前言
  2. 2. 二、准备
    1. 2.1. 2.1、依赖引入
    2. 2.2. 2.2、连接yml的配置
    3. 2.3. 2.3、config注入配置
    4. 2.4. 2.4、消费者的配置
    5. 2.5. 2.5、消息生产者
  3. 3. 三、ack配置和测试
    1. 3.1. 3.1、模拟消费者二出问题
  4. 4. 四、分析几个回执方法
    1. 4.1. 4.1、确认消息
    2. 4.2. 4.2、拒绝消息
    3. 4.3. 4.3、拒绝消息
  5. 5. 五、总结