RocketMQ Master????????????????????????????
??????????
????????RocketMQ??master??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
??????????????
1.RocketMQ????????????3.2.6????
2.????????2??broker??????broker????1master +
1slave??????????SYNC_MASTER??ASYNC_FLUSH??????
3.????broker??????????????????commitlog????????????????commitlog??????????????????????????
??????????
1.????????????????????????????????10????????????????????topic????????????????????????
2.??????????????????????????????broker????????????BrokerA??????master??????????????????????????????????kill
-9????
3.????????????????????????????????????????master??
????????????????????????????????
1.BrokerA??slave????????????????master????????????????????????????????slave??????
2.????????????BrokerA??master????????????????????????????
3.????master????????????????????????????queue
offset????????????????????????????????????queueOffset??
4.????master????????master??????????????????????????????10??????G??????
{
"timestamp": "2017-12-28T00:18:48.620+0800",
"level": "WARN",
"thread": "FlushConsumeQueueService",
"logger": "RocketmqStoreError",
"message": "findMapedFileByOffset offset not matched, request Offset: 0,
index: -6, mapedFileSize: 6000000, mapedFiles count: 2, StackTrace:
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)"
}
5.????????????????????????????????????????????????????????????????????????
{
"timestamp": "2017-12-28T00:39:01.748+0800",
"level": "WARN",
"thread": "PullMessageThread_15",
"logger": "RocketmqStoreError",
"message": "findMapedFileByOffset offset not matched, request Offset:
19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2,
StackTrace:
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)"
}
????????????????????????
??????????????????PullMessageProcessor????????????????????????????????????????????PullMessageProcessor??????DefaultMessageStore.getMessage()??????getMessage()????????????????CommitLog??public
SelectMapedBufferResult getMessage(final long offset, final int size)
??????????????????????MapedFileQueue??????findMapedFileByOffset(final long
offset, final boolean returnFirstOnNotFound)??????????????
????????????????????????????????DefaultMessageStore.getMessage()??offsetPy??????????????????????????????ConsumeQueue????????????????????????CommitlogOffset??????bufferConsumeQueue??????????????????????????????????
bufferConsumeQueue????????????offset????????offset????????queue??queueOffset??????offset??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????nextBeginOffset??????????????
??????????????????nextBeginOffset????????????????????nextBeginOffset????????????????????????????????????????????queueOffset????????????
??????????????????????????????????????????????????????????????????????????????DefaultMessageStore.getMessage()??????SelectMapedBufferResult
selectResult = this.commitLog.getMessage(offsetPy,
sizePy);??????????????selectResult????????null??????????????????????????GetMessageStatus.MESSAGE_WAS_REMOVING????????????????????????????????
????????????????PullMessageProcessor????????????????????????????????????????????????????????????
??????????MQClientAPIImpl??????PullResult processPullResponse(final
RemotingCommand response) ????????????????????????NO_MATCHED_MSG????????
????????????????????DefaultMQPushConsumerImpl??pullMessage(final PullRequest
pullRequest)????????PullCallback??????????????NO_MATCHED_MSG????????
????????nextBeginOffset????????????????????????????????
??????????????????????????????????????????????????????????????????????broker??????????????????????????????broker??????????????????????DefaultMessageStore.getMessage()??????????????offsetPy????????????commitlog
offset????????/home/logs/rocketmq/store/consumequeue????????????????????????????????????????????????????????????????????
??????????????????
master??????????master??????????????????????????????????????consumequeue??????master??slave????????????????????master????????????slave??????????????????????????slave????????????????queue
offset??master??slave??????????????master??????????????????????master??????????master????????????????master????????????????????queue
offset??
????????slave????????/home/logs/rocketmq/store/consumequeue??master????????slave????????????offset??1000??15000??????????master??consumequeue????????????????1000??10000????master????????????????????slave????????????15000????master??????15000??????master??????????????????????DefaultMessageStore.getMessage()??????????offset
overflow??????????????maxOffset????????????????nextBeginOffset??????0????????consumequeue??minoffset??0????????????offset=0??????????????
??????0????????????????????????????????
????????????rocketmq??bug????????RocketMQ??????????????????????????????????????????????????????????
??????????????????????????????????????
??consumeQueue??minOffset=0????????????consumeQueue??????????queue??????????????????queue??????????????????????????????????maxOffset????????????????????????????????????0??????????????????????????????????????
??minOffset????0????????????queue????????????????????????????queue??????????????????????????maxOffset????????????????
??????????????????????????????????????topic??????????queue??????????????????????minOffset=0??(??????????????)????????master????????????slave??????????master????????????????????slave????????????queue??????????master????????????????????????master??????????????????????????????master??????????????????????????????????0????????????????????????????????????????????commitlog????????????????????????????????
??????????????????Apache RocketMQ
4.2.0??????DefaultMessageStore.getMessage()????????????
????this.getMessageStoreConfig().isOffsetCheckInSlave()??????false??????????????slave??offset????????Apache??????????????????????????????????????????????????
Apache????????????????master????????slave??????master??????????????slave??????master??????????????????master????????minoffset??0????????0??????????????
??slave????????master??????slave??????????????master??????????master????????slave????????????????slave????????????????master??????????????????????????????slave??????????????????master??????slave??????????master??????????
??????????????????????????????????????????????????????????????????????????????????????????
??????????????????????????????????????????????????????????????????????????broker????????????????????master??slave??????????????????????????????????????????????????????master??????????????slave??????????????master????????????????????????commitlog??consumequeue????config??????????????????????????????????????????????????????????master????????????????????????????????
????????????????????????????
??????????????????????????RocketMQ??????????????????Apache??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
????????Apache
RocketMQ????????????4.3.0??????????????????????????????????????????????????????issue??https://issues.apache.org/jira/browse/ROCKETMQ-348
????????????????????????????????????????????????????????????????????????????????????????????????