Hi,

感谢卢松的两篇文章,非常欢迎大家投稿 ~

鉴于RocketMQ的官方投稿流程刚刚启动,能否麻烦把文章在Google 
Doc上贴出来,然后user列表和我们讲下地址,我们一起看看先。对啦,麻烦授权以下账号修改权限:

lizhan...@gmail.com
z...@xinyu.im
fengji...@gmail.com


Best Regards,
Von Gosling

> 在 2018年4月2日,12:27,yukon <yu...@apache.org> 写道:
> 
> Hi,
> 
> 非常欢迎撰写RocketMQ相关的技术文章,建议放到Google DOC上并共享出来方便大家进行Comment。
> 
> Regards,
> yukon
> 
> 2018-04-02 12:06 GMT+08:00 404828407 <404828...@qq.com 
> <mailto:404828...@qq.com>>:
> 
> 
> RocketMQ Master宕机后重启出现的严重问题分析
> 
> 
> 作者:卢松
> 
> 以前发现RocketMQ的master机器宕机时会出现很多诡异的问题,所以最近做了一系列实验来试图找到问题的原因,也是想发现风险点。没想到发现了一个更严重的问题,下面会详细的说明下,另外出现问题的原因属于猜测,望高人看到后能指点下。
> 
> 试验环境准备:
> 1.RocketMQ版本:阿里的3.2.6版本
> 2.集群包含2组broker,每组broker都是1master + 1slave,都设置成SYNC_MASTER、ASYNC_FLUSH模式。
> 3.每个broker上已经存储过好几个commitlog文件,并且已经有commitlog文件删除过(数据过期了)。
> 
> 测试步骤:
> 1.尽量多的消费组、尽量多的消费者(10个以上)开始消费多个topic,消费者自始至终不停机。
> 2.在消费者消费时,突然把其中一组broker(姑且命名为BrokerA吧)的master断电(注意是断电或者硬盘损坏、不是kill -9)。
> 3.过一段时间(半小时左右),重启宕机的这个master。
> 
> 观察现象,记录事后的现象及疑问:
> 1.BrokerA的slave机器一直存活着,master宕机后,消费者新的消费进度会存在slave上面。
> 2.宕机后,拉取BrokerA的master中的数据文件,文件没有损坏。
> 3.重启master后,消费客户端会从很久以前的queue offset开始拉取消息,为啥会去取到很久以前的queueOffset?
> 4.重启master后,这个master出现大量如下异常,异常日志达到10分钟每G的量:
> {
>     "timestamp": "2017-12-28T00:18:48.620+0800",
>     "level": "WARN",
>     "thread": "FlushConsumeQueueService",
>     "logger": "RocketmqStoreError",
>     "message": "findMapedFileByOffset offset not matched, request Offset: 0, 
> index: -6, mapedFileSize: 6000000, mapedFiles count: 2, StackTrace: 
> \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)"
> }
> 
> 5.另外还有大量的这种异常,这类异常显示大量消费客户端去拉取很久以前的消息。
> {
>     "timestamp": "2017-12-28T00:39:01.748+0800",
>     "level": "WARN",
>     "thread": "PullMessageThread_15",
>     "logger": "RocketmqStoreError",
>     "message": "findMapedFileByOffset offset not matched, request Offset: 
> 19382879484, index: -7, mapedFileSize: 1073741824 <tel:010%207374%201824>, 
> mapedFiles count: 2, StackTrace: 
> \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)"
> }
> 
> 
> 这种现象的原因分析过程:
> 首先第二类异常出在PullMessageProcessor的拉取消息过程中,消费客户端不停拉取消息时,PullMessageProcessor会调用DefaultMessageStore.getMessage(),而在getMessage()中,更具体的是在CommitLog的public
>  SelectMapedBufferResult getMessage(final long offset, final int size) 
> 方法中,这个方法会调用MapedFileQueue的方法findMapedFileByOffset(final long offset, final 
> boolean returnFirstOnNotFound),具体看下图:
> 
> 
> 
> 
> 
> 
> 
> 从代码上看,出现这个异常原因就是DefaultMessageStore.getMessage()中offsetPy拿到的值过小,这个值实际上是从ConsumeQueue的文件中读到的单条消息的CommitlogOffset,说明bufferConsumeQueue这个对象拿到的是老的数据。如下图:
> 
> 
> 
> bufferConsumeQueue对象又是根据offset定位的,offset是每一个queue的queueOffset,这个offset实际上是消费端拉取消息时传递过来的,它来自于两种地方,一是消费端自己拉取的消费进度,另外一种是消费端每次拉取消息,拉取不到后拿到的nextBeginOffset参数。如下图:
> 
> 
> 
> 所以根源就出在这个nextBeginOffset的计算逻辑上,为什么nextBeginOffset会计算出过小的值,导致消费端一直在消费过小的queueOffset对应的消息?
> 
> 我们先不去查这个问题的原因,就按照这个逻辑走下去,看看会发生什么情况。还是回到DefaultMessageStore.getMessage()中,当SelectMapedBufferResult
>  selectResult = this.commitLog.getMessage(offsetPy, 
> sizePy);拿不到数据后,selectResult返回的是null,这个方法最终返回的状态是GetMessageStatus.MESSAGE_WAS_REMOVING,表示消息的数据文件正在被删除。
> 
> 
> 
> 而调用这个方法的PullMessageProcessor返回给消费客户端的是这个状态,请消费端立即尝试重新拉取消息:
> 
> 
> 
> 消费客户端MQClientAPIImpl在方法PullResult processPullResponse(final RemotingCommand 
> response) 处理这个返回状态,转换成NO_MATCHED_MSG的结果。
> 
> 
> 
> 同样是消费客户端,在DefaultMQPushConsumerImpl的pullMessage(final PullRequest 
> pullRequest)方法中,PullCallback是这么处理这种NO_MATCHED_MSG结果的:
> 从收到的nextBeginOffset开始,立即去重新拉取后面的消息。
> 
> 
> 
> 
> 而每次依然是去拉取很久前的消息,这样就形成了消费端不停拉取消息的日志,broker出现大量找不到消息的日志,并且broker负载很高。问题的关键是DefaultMessageStore.getMessage()中,为啥拿到的offsetPy是已经删除的commitlog
>  
> offset,难道是/home/logs/rocketmq/store/consumequeue中文件损坏(这种可能性极低,目前还没有发现断电导致文件损坏的情况)?
> 
> 那就有另一种可能:
> master突然断电,master上数据就有可能丢失(没有来得及刷盘),consumequeue数据在master、slave的不一致。而消费端在master宕机后切换到slave上消费,消费进度也会提交到slave上,导致消费进度queue
>  
> offset在master、slave上不一样。然后master重启后,消费端重新回到master拉取消息,master上的数据丢失了,master识别不出消费端提交的queue
>  offset。
> 
> 总结下,slave上存储的/home/logs/rocketmq/store/consumequeue比master新,假如slave上最小、最大offset是1000,15000,而这时候master的consumequeue最小、最大数据是1000,10000。当master重启后,消费客户端在slave上已经消费到15000,在master上会从15000开始去master上拉取消息进行消费,在DefaultMessageStore.getMessage()中,当发现offset
>  
> overflow严重溢出(大于maxOffset)时,就可能会把nextBeginOffset设置为0(恰好此consumequeue的minoffset为0),下次就从offset=0开始拉取消息。
> 
> 开始从0消费就会拉取很久前的消息数据了。
> 
> 
> 
> 
> 我猜测这不是rocketmq的bug,应该是RocketMQ审慎的选择,它的原则是:宁愿多消费,重复消费也不要漏消费。
> 
> 之所以这么设计,我的猜测或者解释如下:
> 当consumeQueue的minOffset=0时,说明这个consumeQueue可能是新的queue,也可能是新扩容的queue,这时候为了避免有消息漏掉,在超过maxOffset这种奇怪的情况下,消费端下次消费就从0开始消费,宁愿重复消费也不要漏掉消费。
> 
> 当minOffset大于0时,说明这个queue已经长时间存在过了,不会是新queue,也不会是新扩容的,只需从maxOffset继续消费就行了。
> 
> 这么设计正常情况下没有问题,但是当一个topic下面的某个queue上积压了很多消息,并且minOffset=0时(表示没有删除过)。这时候master突然宕机了,slave上的数据比master新,并且消费客户端从slave上消费了这个queue上的消息。master重启后,消费客户端切换到master上消费,发现自己消费的消息对于master已经溢出了,根据上面的逻辑,就会从0重新消费,大量重复消费,并且会消费那些已经从commitlog中删除的消息,出现大量错误日志。
> 
> 再看看刚发布不久的Apache RocketMQ 4.2.0版本中DefaultMessageStore.getMessage()这段的实现,
> 
> 
> 
> 
> 
> 其中this.getMessageStoreConfig().isOffsetCheckInSlave()默认为false,默认不开启在slave上offset的检查。Apache版本的实现更复杂了一点,但是依然解决不了这个问题。
> 
> Apache版本的逻辑是:当master宕机后,slave数据比master新。消费端先从slave消费,master重启后,消费端再从master消费,当minoffset为0时还会从0开始从新消费。
> 当slave宕机后,master数据比slave的新,消费端从master消费消息。master再宕机,slave重启后,消费端从slave消费,这时候会从master上记录的消费进度开始消费。因为slave的数据总归是来自于master,并且slave已经同步了master的新数据。
> 
> 好了,问题分析到现在,我大概能说的通这个问题了,虽然事实不一定如此,也少有人碰到这个问题。
> 
> 如果出现这种情况,也不知道怎么解决的话。我想到方式是:保留现场,堆栈信息,broker端日志,客户端日志,master和slave的所有数据文件。处理好宕机临界点的数据,最终必须要重启master时,要拿备份的slave的数据文件覆盖master的数据文件,这些文件包括commitlog,consumequeue以及config下面的所有文件(消费进度),然后再重启。简单粗暴实用,避免master起来后负载飙升直至不可用的情况。
> 
> 有什么风险呢?目前还不清楚。
> 
> 总结:这是目前为止我发现的RocketMQ最严重的一个问题,Apache版本也有此问题。其他问题都是小打小闹,没有太大影响,唯独此问题需要慎重处理。问题根源还是数据不一致,不支持高可用。集群内数据不一致,服务器端又处理不好这种不一致。
> 
> 不过按照Apache 
> RocketMQ的规划,要到4.3.0版本才支持高可用,应该不需要等很久。对于这个问题提了个issue:https://issues.apache.org/jira/browse/ROCKETMQ-348
>  <https://issues.apache.org/jira/browse/ROCKETMQ-348>
> 
> 最后,以上都属于猜测,大概也许可能是这个问题导致的,还望有高手出来指点迷津,此文算是抛砖引玉了。
> 
> 
> 

Reply via email to