一条消息从生产到被消费,将会经历三个阶段:
以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。
生产者(Producer)通过网络发送消息给Broker,当Broker收到之后,将会返回确认响应信息给Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。
发送模式
可靠同步发送
可靠异步发送
单向(Oneway)发送
RocketMQ发送消息示例代码如下:
DefaultMQProducermqProducer=newDefaultMQProducer("test");//设置nameSpace地址mqProducer.setNamesrvAddr("namesrvAddr");mqProducer.start();Messagemsg=newMessage("test_topic"/*Topic*/,"HelloWorld".getBytes(RemotingHelper.DEFAULT_CHARSET)/*Messagebody*/);//发送消息到一个Brokertry{SendResultsendResult=mqProducer.send(msg);}catch(RemotingExceptione){e.printStackTrace();}catch(MQBrokerExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}send方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。
消息发送成功仅代表消息已经到了Broker端,Broker在不同配置下,可能会返回不同响应状态:
引用官方状态说明:
DefaultMQProducermqProducer=newDefaultMQProducer("test");//设置nameSpace地址mqProducer.setNamesrvAddr("127.0.0.1:9876");mqProducer.setRetryTimesWhenSendFailed(5);mqProducer.start();Messagemsg=newMessage("test_topic"/*Topic*/,"HelloWorld".getBytes(RemotingHelper.DEFAULT_CHARSET)/*Messagebody*/);try{//异步发送消息到,主线程不会被阻塞,立刻会返回mqProducer.send(msg,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){//消息发送成功,}@OverridepublicvoidonException(Throwablee){//消息发送失败,可以持久化这条数据,后续进行补偿处理}});}catch(RemotingExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。
不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如下:
//同步发送消息重试次数,默认为2mqProducer.setRetryTimesWhenSendFailed(3);//异步发送消息重试次数,默认为2mqProducer.setRetryTimesWhenSendAsyncFailed(3);总结
producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,producer同步等待broker响应消息的发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。
默认情况下,消息只要到了Broker端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后Broker定期批量的将一组消息从内存异步刷入磁盘。
这种方式减少I/O次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。
若想保证Broker端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。
修改Broker端配置如下:
集群部署
为了保证可用性,Broker通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到slave节点。
默认方式下,消息写入master成功,就可以返回确认响应给生产者,接着消息将会异步复制到slave节点。
注:master配置:flushDiskType=SYNC_FLUSH
此时若master突然宕机且不可恢复,那么还未复制到slave的消息将会丢失。
为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待slave节点复制完成,才会返回确认响应。
异步复制与同步复制区别:
Brokermaster节点同步复制配置如下:
总结
在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。
结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker需要采用如下配置:
##master节点配置flushDiskType=SYNC_FLUSHbrokerRole=SYNC_MASTER##slave节点配置brokerRole=slaveflushDiskType=SYNC_FLUSH同时这个过程我们还需要生产者配合,判断返回状态是否是SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。
虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。
从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了AtleastOnce机制保证消息可靠消费。
何为AtleastOnce?
Consumer先pull消息到本地,消费完成后,才向服务器返回ack。
通常消费消息的ack机制一般分为两种思路:
1、先提交后消费;
2、先消费,消费成功后再提交;
思路一可以解决重复消费的问题但是会丢失消息,因此Rocket默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。
消费者从broker拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态给Broker。
如果Broker未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。
消息消费的代码如下:
//实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("test_consumer");//设置NameServer的地址consumer.setNamesrvAddr("namesrvAddr");//订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("test_topic","*");//注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List
最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性。