一. 前言
最近有很多小伙伴開始找工作,在面試時(shí),面試官經(jīng)常問(wèn)到一個(gè)題目:RabbitMQ如何防止重復(fù)消費(fèi)?
有很多小伙伴這個(gè)時(shí)候都在想,消息怎么就會(huì)重復(fù)消費(fèi)呢???.......
所以他們?cè)诿嬖嚭缶团軄?lái)問(wèn)小編,針對(duì)這個(gè)比較高頻的題目,小編就在這里為大家來(lái)講講MQ防止重復(fù)消費(fèi)的實(shí)現(xiàn)方案吧。
二. 面試題考點(diǎn)
如果面試官是小編的話,那么我想考察的,其實(shí)就是候選人除了對(duì)技術(shù)的基本使用之外,再就是在各種實(shí)際應(yīng)用場(chǎng)景中對(duì)可能發(fā)生問(wèn)題的實(shí)際處理能力。
所以這道題的考點(diǎn),最起碼有兩點(diǎn):
第一是RabbitMQ中消息的重復(fù)消費(fèi)是如何產(chǎn)生的,我們首先要發(fā)現(xiàn)問(wèn)題,知道問(wèn)題產(chǎn)生原因:
第二是針對(duì)這個(gè)重復(fù)消費(fèi)問(wèn)題的處理方案及機(jī)制。
三. 解題分析
接下來(lái)小編就根據(jù)上述考點(diǎn),帶大家來(lái)一起分析這個(gè)問(wèn)題的解題思路。
3.1RabbitMQ消息重復(fù)消費(fèi)的產(chǎn)生原因
根據(jù)上圖,給大家梳理總結(jié)出了消息重復(fù)消費(fèi)的產(chǎn)生過(guò)程,如下:
消費(fèi)方的業(yè)務(wù)項(xiàng)目從MQ隊(duì)列中接收數(shù)據(jù);
接著處理業(yè)務(wù);
業(yè)務(wù)處理成功后,消費(fèi)方項(xiàng)目給MQ返回ack進(jìn)行手動(dòng)確認(rèn);
返回回調(diào)執(zhí)行結(jié)果的過(guò)程中,因?yàn)榫W(wǎng)絡(luò)抖動(dòng)等原因,回調(diào)數(shù)據(jù)時(shí),MQ沒(méi)有返回成功,所以MQ隊(duì)列中的數(shù)據(jù)會(huì)再次發(fā)給業(yè)務(wù)項(xiàng)目,造成重復(fù)消費(fèi)。
3.2. RabbitMQ消息重復(fù)消費(fèi)的處理方案
針對(duì)消息的重復(fù)消費(fèi)問(wèn)題,根據(jù)上圖總結(jié)的解決思路如下:
監(jiān)聽(tīng)器接收MQ隊(duì)列中的數(shù)據(jù):
利用redis的setnx命令,以消息唯一id為key,以消息內(nèi)容為value,超時(shí)時(shí)間設(shè)置為10秒,存入redis中;
如果能夠成功存入,說(shuō)明沒(méi)有重復(fù)消費(fèi),則處理業(yè)務(wù),處理完業(yè)務(wù)后返回ack或者nack確認(rèn);
如果存不進(jìn)去,則說(shuō)明重復(fù)消費(fèi),直接返回ack確認(rèn)的回調(diào)信息就可以了。
3.3解決重復(fù)消費(fèi)的案例代碼
發(fā)送方測(cè)試代碼
/**
* 測(cè)試發(fā)送
* @author 千鋒
*/
@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() throws IOException {
//給消息封裝一個(gè)唯一id對(duì)象
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
//第四個(gè)參數(shù): 設(shè)置消息唯一id
rabbitTemplate.convertAndSend("交換器名字","路由鍵","千鋒測(cè)試MQ重復(fù)消費(fèi)處理?。?,messageId);
}
}
接收方測(cè)試代碼
package com.qf.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 千鋒
*/
@Component
public class Consumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "隊(duì)列名字")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 獲取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//1. 設(shè)置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消費(fèi)消息
System.out.println("接收到消息:" + msg);
//3. 設(shè)置key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手動(dòng)ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 獲取Redis中的value即可 如果是1,手動(dòng)ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
四. 總結(jié)
經(jīng)過(guò)上面的分析,最后健哥再給大家總結(jié)一下這個(gè)問(wèn)題的完整答案。
問(wèn)題產(chǎn)生原因:
因?yàn)橄M(fèi)方和MQ服務(wù)器網(wǎng)絡(luò)閃斷等原因,造成了接收方消費(fèi)后,返回給MQ服務(wù)器一個(gè)ack確認(rèn)消息,結(jié)果MQ沒(méi)有接收到,造成了重復(fù)消費(fèi)。
解決過(guò)程:
利用redis的setnx命令,將消費(fèi)的消息id存入到redis,超時(shí)時(shí)間設(shè)置為10秒,然后再給mq返回ack。消費(fèi)前要判斷redis中是否存在這個(gè)消息id,如果不存在說(shuō)明沒(méi)有消費(fèi)過(guò),則正常消費(fèi);如果redis中存在這個(gè)消息id,則說(shuō)明重復(fù)消費(fèi),直接返回ack,不重復(fù)執(zhí)行業(yè)務(wù)。
以上就是MQ中消息重復(fù)消費(fèi)的產(chǎn)生原因及解決思路和對(duì)應(yīng)案例,現(xiàn)在你知道該怎么解決了嗎?更多關(guān)于“Java培訓(xùn)”的問(wèn)題,歡迎咨詢千鋒教育在線名師。千鋒已有十余年的培訓(xùn)經(jīng)驗(yàn),課程大綱更科學(xué)更專業(yè),有針對(duì)零基礎(chǔ)的就業(yè)班,有針對(duì)想提升技術(shù)的好程序員班,高品質(zhì)課程助力你實(shí)現(xiàn)java程序員夢(mèng)想。