?????????? RocketMQ Master????????????????????????????

2018-07-16 Thread 404828407
cool
@hooligan




--  --
??: "hooligan"<79015...@qq.com>;
: 2018??7??16??(??) 11:46
??: "users";

: ?? RocketMQ Master



??


--
   
hooligan




 




--  --
??: "404828407"<404828...@qq.com>;
: 2018??4??2??(??) 12:06
??: "users";

:  RocketMQ Master







RocketMQ Master????????


??




RocketMQ??master??




??

1.RocketMQ3.2.6

2.2??broker??broker1master + 
1slave??SYNC_MASTER??ASYNC_FLUSH??

3.broker??commitlogcommitlog??




??

1.10topic

2.??brokerBrokerA??master??kill
 -9

3.master??






1.BrokerA??slavemasterslave??

2.BrokerA??master

3.masterqueue 
offsetqueueOffset??

4.mastermaster??10??G??

{


"timestamp": "2017-12-28T00:18:48.620+0800",

"level": "WARN",

"thread": "FlushConsumeQueueService",

"logger": "RocketmqStoreError",

"message": "findMapedFileByOffset offset not matched, request Offset: 0, 
index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: 
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)"

}




5.

{

"timestamp": "2017-12-28T00:39:01.748+0800",

"level": "WARN",

"thread": "PullMessageThread_15",

"logger": "RocketmqStoreError",

"message": "findMapedFileByOffset offset not matched, request Offset: 
19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2, 
StackTrace: 
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)"

}









??PullMessageProcessorPullMessageProcessor??DefaultMessageStore.getMessage()??getMessage()CommitLog??public
 SelectMapedBufferResult getMessage(final long offset, final int size) 
??MapedFileQueue??findMapedFileByOffset(final long 
offset, final boolean returnFirstOnNotFound)??



















DefaultMessageStore.getMess

?????????? RocketMQ Master????????????????????????????

2018-07-16 Thread hooligan
??


--
   
hooligan




 




--  --
??: "404828407"<404828...@qq.com>;
: 2018??4??2??(??) 12:06
??: "users";

:  RocketMQ Master????????







RocketMQ Master


??????




RocketMQ??master??




??

1.RocketMQ3.2.6

2.2??broker??broker1master + 
1slave??SYNC_MASTER??ASYNC_FLUSH??

3.broker??commitlogcommitlog??




??

1.10topic

2.??brokerBrokerA??master??kill
 -9

3.master??






1.BrokerA??slavemasterslave??

2.BrokerA??master

3.masterqueue 
offsetqueueOffset??

4.mastermaster??10??G??

{


"timestamp": "2017-12-28T00:18:48.620+0800",

"level": "WARN",

"thread": "FlushConsumeQueueService",

"logger": "RocketmqStoreError",

"message": "findMapedFileByOffset offset not matched, request Offset: 0, 
index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: 
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)"

}




5.

{

"timestamp": "2017-12-28T00:39:01.748+0800",

"level": "WARN",

"thread": "PullMessageThread_15",

"logger": "RocketmqStoreError",

"message": "findMapedFileByOffset offset not matched, request Offset: 
19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2, 
StackTrace: 
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)"

}









??PullMessageProcessorPullMessageProcessor??DefaultMessageStore.getMessage()??getMessage()CommitLog??public
 SelectMapedBufferResult getMessage(final long offset, final int size) 
??MapedFileQueue??findMapedFileByOffset(final long 
offset, final boolean returnFirstOnNotFound)??



















DefaultMessageStore.getMessage()??offsetPy??ConsumeQueueCommitlogOffset??bufferConsumeQueue??









bufferConsumeQueueoffsetoffsetqueue??queueOffset??offset??

Re: 投稿 RocketMQ Master宕机后重启出现的严重问题分析

2018-04-02 Thread Von Gosling
Hi,


感谢卢松的两篇文章,非常欢迎大家投稿 ~

鉴于RocketMQ的官方投稿流程刚刚启动,能否麻烦把文章在Google 
Doc上贴出来,然后user列表和我们讲下地址,我们一起看看先。对啦,麻烦授权以下账号修改权限:

lizhan...@gmail.com
z...@xinyu.im
fengji...@gmail.com


Best Regards,
Von Gosling

> 在 2018年4月2日,12:27,yukon <yu...@apache.org> 写道:
> 
> Hi,
> 
> 非常欢迎撰写RocketMQ相关的技术文章,建议放到Google DOC上并共享出来方便大家进行Comment。
> 
> Regards,
> yukon
> 
> 2018-04-02 12:06 GMT+08:00 404828407 <404828...@qq.com 
> <mailto:404828...@qq.com>>:
> 
> 
> RocketMQ Master宕机后重启出现的严重问题分析
> 
> 
> 作者:卢松
> 
> 以前发现RocketMQ的master机器宕机时会出现很多诡异的问题,所以最近做了一系列实验来试图找到问题的原因,也是想发现风险点。没想到发现了一个更严重的问题,下面会详细的说明下,另外出现问题的原因属于猜测,望高人看到后能指点下。
> 
> 试验环境准备:
> 1.RocketMQ版本:阿里的3.2.6版本
> 2.集群包含2组broker,每组broker都是1master + 1slave,都设置成SYNC_MASTER、ASYNC_FLUSH模式。
> 3.每个broker上已经存储过好几个commitlog文件,并且已经有commitlog文件删除过(数据过期了)。
> 
> 测试步骤:
> 1.尽量多的消费组、尽量多的消费者(10个以上)开始消费多个topic,消费者自始至终不停机。
> 2.在消费者消费时,突然把其中一组broker(姑且命名为BrokerA吧)的master断电(注意是断电或者硬盘损坏、不是kill -9)。
> 3.过一段时间(半小时左右),重启宕机的这个master。
> 
> 观察现象,记录事后的现象及疑问:
> 1.BrokerA的slave机器一直存活着,master宕机后,消费者新的消费进度会存在slave上面。
> 2.宕机后,拉取BrokerA的master中的数据文件,文件没有损坏。
> 3.重启master后,消费客户端会从很久以前的queue offset开始拉取消息,为啥会去取到很久以前的queueOffset?
> 4.重启master后,这个master出现大量如下异常,异常日志达到10分钟每G的量:
> {
> "timestamp": "2017-12-28T00:18:48.620+0800",
> "level": "WARN",
> "thread": "FlushConsumeQueueService",
> "logger": "RocketmqStoreError",
> "message": "findMapedFileByOffset offset not matched, request Offset: 0, 
> index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: 
> \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)"
> }
> 
> 5.另外还有大量的这种异常,这类异常显示大量消费客户端去拉取很久以前的消息。
> {
> "timestamp": "2017-12-28T00:39:01.748+0800",
> "level": "WARN",
> "thread": "PullMessageThread_15",
> "logger": "RocketmqStoreError",
> "message": "findMapedFileByOffset offset not matched, request Offset: 
> 19382879484, index: -7, mapedFileSize: 1073741824 <tel:010%207374%201824>, 
> mapedFiles count: 2, StackTrace: 
> \n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)"
> }
> 
> 
> 这种现象的原因分析过程:
> 首先第二类异常出在PullMessageProcessor的拉取消息过程中,消费客户端不停拉取消息时,PullMessageProcessor会调用DefaultMessageStore.getMessage(),而在getMessage()中,更具体的是在CommitLog的public
>  SelectMapedBufferResult getMessage(final long offset, final int size) 
> 方法中,这个方法会调用MapedFileQueue的方法findMapedFileByOffset(final long offset, final 
> boolean returnFirstOnNotFound),具体看下图:
> 
> 
> 
> 
> 
> 
> 
> 从代码上看,出现这个异常原因就是DefaultMessageStore.getMessage()中offsetPy拿到的值过小,这个值实际上是从ConsumeQueue的文件中读到的单条消息的CommitlogOffset,说明bufferConsumeQueue这个对象拿到的是老的数据。如下图:
> 
> 
> 
> bufferConsumeQueue对象又是根据offset定位的,offset是每一个queue的queueOffset,这个offset实际上是消费端拉取消息时传递过来的,它来自于两种地方,一是消费端自己拉取的消费进度,另外一种是消费端每次拉取消息,拉取不到后拿到的nextBeginOffset参数。如下图:
> 
> 
> 
> 所以根源就出在这个nextBeginOffset的计算逻辑上,为什么nextBeginOffset会计算出过小的值,导致消费端一直在消费过小的queueOffset对应的消息?
> 
> 我们先不去查这个问题的原因,就按照这个逻辑走下去,看看会发生什么情况。还是回到DefaultMessageStore.getMessage()中,当SelectMaped

???? RocketMQ Master????????????????????????????

2018-04-01 Thread 404828407
RocketMQ Master


??




RocketMQ??master??




??

1.RocketMQ3.2.6

2.2??broker??broker1master + 
1slave??SYNC_MASTER??ASYNC_FLUSH??

3.broker??commitlogcommitlog??




??

1.10topic

2.??brokerBrokerA??master??kill
 -9

3.master??






1.BrokerA??slavemasterslave??

2.BrokerA??master

3.masterqueue 
offsetqueueOffset??

4.mastermaster??10??G??

{


"timestamp": "2017-12-28T00:18:48.620+0800",

"level": "WARN",

"thread": "FlushConsumeQueueService",

"logger": "RocketmqStoreError",

"message": "findMapedFileByOffset offset not matched, request Offset: 0, 
index: -6, mapedFileSize: 600, mapedFiles count: 2, StackTrace: 
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.commit(MapedFileQueue.java:442)\n\tcom.alibaba.rocketmq.store.ConsumeQueue.commit(ConsumeQueue.java:345)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1459)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1479)\n\tjava.lang.Thread.run(Thread.java:722)"

}




5.

{

"timestamp": "2017-12-28T00:39:01.748+0800",

"level": "WARN",

"thread": "PullMessageThread_15",

"logger": "RocketmqStoreError",

"message": "findMapedFileByOffset offset not matched, request Offset: 
19382879484, index: -7, mapedFileSize: 1073741824, mapedFiles count: 2, 
StackTrace: 
\n\tjava.lang.Thread.getStackTrace(Thread.java:1567)\n\tcom.alibaba.rocketmq.common.UtilAll.currentStackTrace(UtilAll.java:59)\n\tcom.alibaba.rocketmq.store.MapedFileQueue.findMapedFileByOffset(MapedFileQueue.java:467)\n\tcom.alibaba.rocketmq.store.CommitLog.getMessage(CommitLog.java:664)\n\tcom.alibaba.rocketmq.store.DefaultMessageStore.getMessage(DefaultMessageStore.java:546)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:283)\n\tcom.alibaba.rocketmq.broker.processor.PullMessageProcessor.processRequest(PullMessageProcessor.java:84)\n\tcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:172)\n\tjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)\n\tjava.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)\n\tjava.util.concurrent.FutureTask.run(FutureTask.java:166)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tjava.lang.Thread.run(Thread.java:722)"

}









??PullMessageProcessorPullMessageProcessor??DefaultMessageStore.getMessage()??getMessage()CommitLog??public
 SelectMapedBufferResult getMessage(final long offset, final int size) 
??MapedFileQueue??findMapedFileByOffset(final long 
offset, final boolean returnFirstOnNotFound)??



















DefaultMessageStore.getMessage()??offsetPy??ConsumeQueueCommitlogOffset??bufferConsumeQueue??









bufferConsumeQueueoffsetoffsetqueue??queueOffset??offset??nextBeginOffset??









??nextBeginOffsetnextBeginOffsetqueueOffset




??DefaultMessageStore.getM