RocketMQ????????????????????????????
??????????









??????????????????????????????????????????????????????????????????????????????????????????????????????????




??RocketMQ??????????????????????????????topic????????????????????????????????????????????????topic????????????????????????????????????????????????????????????????????topic????????????????????????????????????????????




????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????




????????????????????????????????????????1,2,3,4...??????????????????????????




????????????????????????????????????????????????????????topic??????offset????????????????????????DefaultMQPullConsumerImpl??consumeFromWhere()????









??CONSUME_FROM_LAST_OFFSET??????????????????????????????????????????????????????????????RocketMQ
 3.2.6??????????????????????????????????????????????????????









??????????????????????????????????????????????????????????topic????queue??????????????????????????????????????????offset??????????????????????????????????????????????RebalanceImpl??updateProcessQueueTableInRebalance()??????????









??????????????????????????????????????????????????RebalancePushImpl??computePullFromWhere()????????????????????????????????????????????????????????????????????????????????????????????????bug????????????????????????????????????????????????????????????????????????????????????????









??????computePullFromWhere()????????????computePullFromWhere()??????????????????????offset????????????CONSUME_FROM_LAST_OFFSET??????????????????????????????broker??????????queue????????????????









 offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) 
??????????????????RemoteBrokerOffsetStore??readOffset()?? ????????????????????









????long brokerOffset = 
this.fetchConsumeOffsetFromBroker(mq)????????????????????????MQClientAPIImpl??queryConsumerOffset()??broker????????queue????????????????????requestCode??QUERY_CONSUMER_OFFSET????????









????????broker????consumerOffser????????broker??????????????????????????????ClientManageProcessor??queryConsumerOffset(ChannelHandlerContext
 ctx, RemotingCommand request)????????????









????????????????????????????????????????????????????????????queue??minOffset????getMessageStore().checkInDiskByConsumeOffset(

requestHeader.getTopic(),requestHeader.getQueueId(), 0)

????????????false????????????????????????queue????????????????????????????????????????





????????api????????????minOffset=0????????offset=0????minOffset>0??????QUERY_NOT_FOUND????????????????????????????????MQBrokerException??????




broker????????????????????????????????

??????????????????????????????????????????Offset??0????????????Topic????????????????????????????????????????????????????????0??????????????????Topic??????????????????????????0??????????

??


??????????????????????broker??????consumerOffset????????????????RemoteBrokerOffsetStore??readOffset()??????minOffset??0????????????????0????minOffset????0????????????????-1????????????????????????????????????????????????????????????









??????????????????????lastOffset 
??????????????RebalancePushImpl??computePullFromWhere()??????lastOffset = 
0??????????0????lastOffset = -1????????????queue??maxOffset??









????????????????????????????????????????queue??????0????????minOffset????0????????????????????????????????????maxOffset??????????????????????????????????????????queue????0????????minOffset????0??????????????????????????????????0????????????queue??




????????????????????????????????????????????????????????????????????




??????RocketMQ??????????????????????????????




??????????????Apache RocketMQ 
4.2.0??broker??????????????????????????????broker????????????????????offset??????????????????????????

org.apache.rocketmq.broker.processor.ConsumerManageProcessor????????????????????????????????




????Apache??????????????????????????????????bug????????????????????????????????????????????queue??minOffset??0??????????????0????????????queue??????????????????????????????????????




????????????????????????????????????????queue??maxOffset????????????????????????topic??????broker??????4??queue??????????????????????????????queue??????????????????????????????????????topic??8??queue??????????????namesrv????????8??queue????????????????????????????????????30????????????????????????????????queue????????????????????????




????????????4????????queue????????????broker??????????????broker????????4??????queue??????queue??????????????????????????????????????????4????queue??maxOffset????????????????????????????4??????queue????????????????????????????????????????????????????????????????????????????????




??????????????????????????????????????????????maxOffset????????????




??????????????????RocketMQ????????????????????????????????????????????????????????????????????????RocketMQ????????????????????????????????????????????????????????????????????????????????
 ???????????????????????????????? ??????




??????????????????????????topic????????????????????????????????????????????????????????????????????????????????




????????????????????????????1.????????mqadmin????resetOffsetByTime??????????????????????????????????
 
2.??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????




????????????????????????????????????????????????????????????????????????????????????????????????????????????????????RocketMQ??????????????????????????????????????????




????????????????????????

Reply via email to