?????????? RocketMQ Master????????????????????????????
cool @hooligan -- -- ??: "hooligan"<79015...@qq.com>; : 2018??7??16??(??) 11:46 ??: "users"; : ?? RocketMQ Master ?? -- hooligan -- -- ??: "404828407"<404828...@qq.com>; : 2018??4??2??(??) 12:06 ??: "users"; : RocketMQ Master RocketMQ Master???????? ?? RocketMQ??master?? ?? 1.RocketMQ3.2.6 2.2??broker??broker1master + 1slave??SYNC_MASTER??ASYNC_FLUSH?? 3.broker??commitlogcommitlog?? ?? 1.10topic 2.??brokerBrokerA??master??kill -9 3.master?? 1.BrokerA??slavemasterslave?? 2.BrokerA??master 3.masterqueue offsetqueueOffset?? 4.mastermaster??10??G?? { "timestamp": "2017-12-28T00:18:48.620+0800", "level": "WARN", "thread": "FlushConsumeQueueService", "logger": "RocketmqStoreError", "message": "findMapedFileByOffset offset not matched, request Offset: 0, index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)" } 5. { "timestamp": "2017-12-28T00:39:01.748+0800", "level": "WARN", "thread": "PullMessageThread_15", "logger": "RocketmqStoreError", "message": "findMapedFileByOffset offset not matched, request Offset: 19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2, StackTrace: \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)" } ??PullMessageProcessorPullMessageProcessor??DefaultMessageStore.getMessage()??getMessage()CommitLog??public SelectMapedBufferResult getMessage(final long offset, final int size) ??MapedFileQueue??findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound)?? DefaultMessageStore.getMess
?????????? RocketMQ Master????????????????????????????
?? -- hooligan -- -- ??: "404828407"<404828...@qq.com>; : 2018??4??2??(??) 12:06 ??: "users"; : RocketMQ Master???????? RocketMQ Master ?????? RocketMQ??master?? ?? 1.RocketMQ3.2.6 2.2??broker??broker1master + 1slave??SYNC_MASTER??ASYNC_FLUSH?? 3.broker??commitlogcommitlog?? ?? 1.10topic 2.??brokerBrokerA??master??kill -9 3.master?? 1.BrokerA??slavemasterslave?? 2.BrokerA??master 3.masterqueue offsetqueueOffset?? 4.mastermaster??10??G?? { "timestamp": "2017-12-28T00:18:48.620+0800", "level": "WARN", "thread": "FlushConsumeQueueService", "logger": "RocketmqStoreError", "message": "findMapedFileByOffset offset not matched, request Offset: 0, index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)" } 5. { "timestamp": "2017-12-28T00:39:01.748+0800", "level": "WARN", "thread": "PullMessageThread_15", "logger": "RocketmqStoreError", "message": "findMapedFileByOffset offset not matched, request Offset: 19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2, StackTrace: \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)" } ??PullMessageProcessorPullMessageProcessor??DefaultMessageStore.getMessage()??getMessage()CommitLog??public SelectMapedBufferResult getMessage(final long offset, final int size) ??MapedFileQueue??findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound)?? DefaultMessageStore.getMessage()??offsetPy??ConsumeQueueCommitlogOffset??bufferConsumeQueue?? bufferConsumeQueueoffsetoffsetqueue??queueOffset??offset??
Re: 投稿 RocketMQ Master宕机后重启出现的严重问题分析
Hi, 感谢卢松的两篇文章,非常欢迎大家投稿 ~ 鉴于RocketMQ的官方投稿流程刚刚启动,能否麻烦把文章在Google Doc上贴出来,然后user列表和我们讲下地址,我们一起看看先。对啦,麻烦授权以下账号修改权限: lizhan...@gmail.com z...@xinyu.im fengji...@gmail.com Best Regards, Von Gosling > 在 2018年4月2日,12:27,yukon <yu...@apache.org> 写道: > > Hi, > > 非常欢迎撰写RocketMQ相关的技术文章,建议放到Google DOC上并共享出来方便大家进行Comment。 > > Regards, > yukon > > 2018-04-02 12:06 GMT+08:00 404828407 <404828...@qq.com > <mailto:404828...@qq.com>>: > > > RocketMQ Master宕机后重启出现的严重问题分析 > > > 作者:卢松 > > 以前发现RocketMQ的master机器宕机时会出现很多诡异的问题,所以最近做了一系列实验来试图找到问题的原因,也是想发现风险点。没想到发现了一个更严重的问题,下面会详细的说明下,另外出现问题的原因属于猜测,望高人看到后能指点下。 > > 试验环境准备: > 1.RocketMQ版本:阿里的3.2.6版本 > 2.集群包含2组broker,每组broker都是1master + 1slave,都设置成SYNC_MASTER、ASYNC_FLUSH模式。 > 3.每个broker上已经存储过好几个commitlog文件,并且已经有commitlog文件删除过(数据过期了)。 > > 测试步骤: > 1.尽量多的消费组、尽量多的消费者(10个以上)开始消费多个topic,消费者自始至终不停机。 > 2.在消费者消费时,突然把其中一组broker(姑且命名为BrokerA吧)的master断电(注意是断电或者硬盘损坏、不是kill -9)。 > 3.过一段时间(半小时左右),重启宕机的这个master。 > > 观察现象,记录事后的现象及疑问: > 1.BrokerA的slave机器一直存活着,master宕机后,消费者新的消费进度会存在slave上面。 > 2.宕机后,拉取BrokerA的master中的数据文件,文件没有损坏。 > 3.重启master后,消费客户端会从很久以前的queue offset开始拉取消息,为啥会去取到很久以前的queueOffset? > 4.重启master后,这个master出现大量如下异常,异常日志达到10分钟每G的量: > { > "timestamp": "2017-12-28T00:18:48.620+0800", > "level": "WARN", > "thread": "FlushConsumeQueueService", > "logger": "RocketmqStoreError", > "message": "findMapedFileByOffset offset not matched, request Offset: 0, > index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: > \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)" > } > > 5.另外还有大量的这种异常,这类异常显示大量消费客户端去拉取很久以前的消息。 > { > "timestamp": "2017-12-28T00:39:01.748+0800", > "level": "WARN", > "thread": "PullMessageThread_15", > "logger": "RocketmqStoreError", > "message": "findMapedFileByOffset offset not matched, request Offset: > 19382879484, index: -7, mapedFileSize: 1073741824 <tel:010%207374%201824>, > mapedFiles count: 2, StackTrace: > \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)" > } > > > 这种现象的原因分析过程: > 首先第二类异常出在PullMessageProcessor的拉取消息过程中,消费客户端不停拉取消息时,PullMessageProcessor会调用DefaultMessageStore.getMessage(),而在getMessage()中,更具体的是在CommitLog的public > SelectMapedBufferResult getMessage(final long offset, final int size) > 方法中,这个方法会调用MapedFileQueue的方法findMapedFileByOffset(final long offset, final > boolean returnFirstOnNotFound),具体看下图: > > > > > > > > 从代码上看,出现这个异常原因就是DefaultMessageStore.getMessage()中offsetPy拿到的值过小,这个值实际上是从ConsumeQueue的文件中读到的单条消息的CommitlogOffset,说明bufferConsumeQueue这个对象拿到的是老的数据。如下图: > > > > bufferConsumeQueue对象又是根据offset定位的,offset是每一个queue的queueOffset,这个offset实际上是消费端拉取消息时传递过来的,它来自于两种地方,一是消费端自己拉取的消费进度,另外一种是消费端每次拉取消息,拉取不到后拿到的nextBeginOffset参数。如下图: > > > > 所以根源就出在这个nextBeginOffset的计算逻辑上,为什么nextBeginOffset会计算出过小的值,导致消费端一直在消费过小的queueOffset对应的消息? > > 我们先不去查这个问题的原因,就按照这个逻辑走下去,看看会发生什么情况。还是回到DefaultMessageStore.getMessage()中,当SelectMaped
???? RocketMQ Master????????????????????????????
RocketMQ Master ?? RocketMQ??master?? ?? 1.RocketMQ3.2.6 2.2??broker??broker1master + 1slave??SYNC_MASTER??ASYNC_FLUSH?? 3.broker??commitlogcommitlog?? ?? 1.10topic 2.??brokerBrokerA??master??kill -9 3.master?? 1.BrokerA??slavemasterslave?? 2.BrokerA??master 3.masterqueue offsetqueueOffset?? 4.mastermaster??10??G?? { "timestamp": "2017-12-28T00:18:48.620+0800", "level": "WARN", "thread": "FlushConsumeQueueService", "logger": "RocketmqStoreError", "message": "findMapedFileByOffset offset not matched, request Offset: 0, index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)" } 5. { "timestamp": "2017-12-28T00:39:01.748+0800", "level": "WARN", "thread": "PullMessageThread_15", "logger": "RocketmqStoreError", "message": "findMapedFileByOffset offset not matched, request Offset: 19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2, StackTrace: \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)" } ??PullMessageProcessorPullMessageProcessor??DefaultMessageStore.getMessage()??getMessage()CommitLog??public SelectMapedBufferResult getMessage(final long offset, final int size) ??MapedFileQueue??findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound)?? DefaultMessageStore.getMessage()??offsetPy??ConsumeQueueCommitlogOffset??bufferConsumeQueue?? bufferConsumeQueueoffsetoffsetqueue??queueOffset??offset??nextBeginOffset?? ??nextBeginOffsetnextBeginOffsetqueueOffset ??DefaultMessageStore.getM