Java的RocketMq队列之消息可靠性详解java

一条消息从生产到被消费,将会经历三个阶段:

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

生产者(Producer)通过网络发送消息给Broker,当Broker收到之后,将会返回确认响应信息给Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

发送模式

可靠同步发送

可靠异步发送

单向(Oneway)发送

RocketMQ发送消息示例代码如下:

DefaultMQProducermqProducer=newDefaultMQProducer("test");//设置nameSpace地址mqProducer.setNamesrvAddr("namesrvAddr");mqProducer.start();Messagemsg=newMessage("test_topic"/*Topic*/,"HelloWorld".getBytes(RemotingHelper.DEFAULT_CHARSET)/*Messagebody*/);//发送消息到一个Brokertry{SendResultsendResult=mqProducer.send(msg);}catch(RemotingExceptione){e.printStackTrace();}catch(MQBrokerExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}send方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。

消息发送成功仅代表消息已经到了Broker端,Broker在不同配置下,可能会返回不同响应状态:

引用官方状态说明:

DefaultMQProducermqProducer=newDefaultMQProducer("test");//设置nameSpace地址mqProducer.setNamesrvAddr("127.0.0.1:9876");mqProducer.setRetryTimesWhenSendFailed(5);mqProducer.start();Messagemsg=newMessage("test_topic"/*Topic*/,"HelloWorld".getBytes(RemotingHelper.DEFAULT_CHARSET)/*Messagebody*/);try{//异步发送消息到,主线程不会被阻塞,立刻会返回mqProducer.send(msg,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){//消息发送成功,}@OverridepublicvoidonException(Throwablee){//消息发送失败,可以持久化这条数据,后续进行补偿处理}});}catch(RemotingExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。

不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如下:

//同步发送消息重试次数,默认为2mqProducer.setRetryTimesWhenSendFailed(3);//异步发送消息重试次数,默认为2mqProducer.setRetryTimesWhenSendAsyncFailed(3);总结

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,producer同步等待broker响应消息的发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。

默认情况下,消息只要到了Broker端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后Broker定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少I/O次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证Broker端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

修改Broker端配置如下:

集群部署

为了保证可用性,Broker通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到slave节点。

默认方式下,消息写入master成功,就可以返回确认响应给生产者,接着消息将会异步复制到slave节点。

注:master配置:flushDiskType=SYNC_FLUSH

此时若master突然宕机且不可恢复,那么还未复制到slave的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待slave节点复制完成,才会返回确认响应。

异步复制与同步复制区别:

Brokermaster节点同步复制配置如下:

总结

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。

结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker需要采用如下配置:

##master节点配置flushDiskType=SYNC_FLUSHbrokerRole=SYNC_MASTER##slave节点配置brokerRole=slaveflushDiskType=SYNC_FLUSH同时这个过程我们还需要生产者配合,判断返回状态是否是SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。

虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。

从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了AtleastOnce机制保证消息可靠消费。

何为AtleastOnce?

Consumer先pull消息到本地,消费完成后,才向服务器返回ack。

通常消费消息的ack机制一般分为两种思路:

1、先提交后消费;

2、先消费,消费成功后再提交;

思路一可以解决重复消费的问题但是会丢失消息,因此Rocket默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

消费者从broker拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态给Broker。

如果Broker未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

消息消费的代码如下:

//实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("test_consumer");//设置NameServer的地址consumer.setNamesrvAddr("namesrvAddr");//订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("test_topic","*");//注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){//执行业务逻辑//标记该消息已经被成功消费returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例consumer.start();以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则我们需要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性。

THE END
1.系统架构笔记2计算机系统基础知识顺序图:描述对象之间的交互,其中循环,选择等复杂交互使用序列片段表示。对象之间的消息类型包括同步消息/异步消息/返回消息/参与者创建消息/参与者销毁消息。 其中交互概览图、定时图、顺序图(序列图)、通信图(协作图)均被称为交互图。除此以外,动态模型还包括状态图和活动图等。 https://blog.csdn.net/HL_LOVE_C/article/details/142323515
2.异步消息传递异步消息通信机制mob64ca13f6bbea的技术博客异步消息传递 异步消息通信机制, 一、同步与异步概念:同步和异步关注的是消息通信机制(synchronouscommunication/asynchronouscommunication)。解释:涉及到IO通知机制;所谓同步,就是发起调用后,被调用者处理消息,必须等处理完才直接返回结果,没处理完之前是不https://blog.51cto.com/u_16213577/10541200
3.消息中间件第八讲:消息队列RocketMQ版实战集群及原理同步刷盘:消息被写入内存的PAGECACHE,返回写成功状态,当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 。吞吐量高,当磁盘损坏时,会丢失消息 异步刷盘:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。吞吐量低,但https://developer.aliyun.com/article/1352571
4.UML图详解(七)——交互图(时序图与协作图)对象的行为也称为消息(Message),通常当一个对象调用另一个对象中的行为时,即完成了一次消息传递。 2.表示方式 在生命线间的带有实心箭头表示消息 3.消息命名 信号或消息名(参数:参数类型):返回值 4.简单消息、同步消息、异步消息 消息分为简单消息(Simple Message)、同步消息(Synchronous Message)和异步消息(Asynchhttps://www.360doc.cn/article/9824753_689086442.html
5.下列选项属于消息的类型的是()下列选项属于消息的类型的是( ) A. 同步消息 B. 返回消息 C. 异步消息 D. 简单消息 E. 交互消息 题目标签:消息选项列选如何将EXCEL生成题库手机刷题 如何制作自己的在线小题库 > 手机使用 分享 反馈 收藏 举报 参考答案: A B C D 复制 纠错 举一反三 下列哪些方面属于数据库加密技术的功能https://www.shuashuati.com/ti/bd97b5557b5547b5a61600aa31c9b1a0.html
6.第八届全国职工职业技能大赛(网络和信息安全管理员)山西选拔赛63.“用户信息”可以理解为在用户使用产品或者服务过程中收集的信息构成用户信息,包括() A、IP地址 B、用户名和密码C、上网时间 D、Cookie信息 20 答案:ABCD 64.目前对MD5,对SHA1算法的攻击描述错误的是: A、能够构造出两个不同的消息,这两个消息产生了相同的消息摘要 B、对于一个已知的消息摘要,能够构造出https://max.book118.com/html/2024/0804/8036015104006116.shtm
7.UML答案214、简述顺序图中消息的分类 答:可分为同步消息、异步消息、返回消息三类。 同步消息:调用某个操作和方法。 异步消息:表示通信信号的非嵌套控制流,该信号异步激活操作。 返回消息:表明该消息从某个消息调用返回 ●必要时为每个消息附加上前置条件和后置条件。 ●调整图的布局,尽力减少线的交叉,使图尽可能地直观、https://m.360docs.net/doc/4a0ad13a3968011ca3009141.html
8.消息队列RocketMQ顺序消息消息队列RocketMQ功能特性用户购买商品生成订单为例,此时若以普通消息发送,则下游订单系统可能消息消费顺序混乱,例如订单出库先执行,后生成订单,从而系统数据不正确。顺序消息就能解决此问题,上游系统发送顺序消息,下游订单系统在同一消费组,会依次按顺序消费,执行相应的逻辑。 典型场景二: 数据实时增量同步 https://ecloud.10086.cn/op-help-center/doc/article/67007
9.消息队列(mq)是什么?有新消息写入,两个 follower 分区会同步 master 变更。两个 Consumer 分别从不同的 master 分区获取https://www.zhihu.com/question/54152397/answer/2654914176
10.3分钟白话RocketMQ系列——如何保证消息不丢失普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。异常默认不重试,可以用户自己重试,并发送到其他队列。 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试。 消息发送模式: 同步:调用发送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFaihttps://blog.itpub.net/70024923/viewspace-2983505/
11.计算机网络HTTP(Hyper Text Transfer Protocol)协议: 又称为超文本传输协议,HTTP由一个客户端和一个服务端来实现,运行在不同端系统中的客户端和服务端通过交换HTTP报文进行会话,而HTTP定义了这些报文的结构及进行报文交换时的方式。 HTTP是在TCP协议上运行的,客户端发起HTTP 请求,服务器做出响应处理后,返回HTTP 响应报文。 https://www.nowcoder.com/discuss/435756266465579008
12.大田县政法系统跨部门智慧执法办案管理平台建设项目服务类①价格项(F1×A1)满分为10分。 a.价格分采用低价优先法计算,即满足招标文件要求且投标价格最低的投标报价为评标基准价,其价格分为满分。其他投标人的价格分统一按照下列公式计算:投标报价得分=(评标基准价/投标报价)×100。因落实政府采购政策需进行价格扣除的,以扣除后的价格计算评标基准价和投标报价。 b.价格扣http://zfcg.cz.sm.gov.cn/upload/document/20211105/5f069fc764594cfea5cfa97e5b021347.html