大家好,公司内部写的自定义RocektMqSource,会偶现位点前移的现象,偶现时间不定,目前找不出原因。Flink 版本
1.4.2,目前是不会从checkPoint恢复,但是会做checkPoint
Source代码如下:

public class RocketMQSource<OUT> extends
RichParallelSourceFunction<OUT> implements
CheckpointedFunction,ResultTypeQueryable<OUT> {
   public static final int         DELAY_MSG_NOT_FOUND         =
1000;         //没有消息时,Delay 50ms后再拉取
   public static final int         DELAY_WHEN_EXCEPTION        =
1000;       //出现异常时,Delay 1s后再拉取
   public static final int         PULL_POOL_INIT_SIZE         =   20;
         //拉去的PullSize
   public static final int         PULL_BATCH_SIZE             =   32;
        //一次批量拉去大小

    private static final Logger LOGGER =
LoggerFactory.getLogger(RocketMQSource.class);

    private transient MQPullConsumerScheduleService consumerScheduleService;
    private transient DefaultMQPullConsumer consumer;

   private MessageDeserializationSchema<OUT> schema;
   private String topic;
   private String group;
   private String tag;

   private AtomicBoolean canceled = new AtomicBoolean(false);
   private AtomicBoolean running;

   private Map<MessageQueue,Long>                              offsetTable;
   private Map<MessageQueue,Long>                              updateTimeTable;

   private Map<MessageQueue,Long>                              restoreTable;

   private transient ListState<Tuple2<MessageQueue,Long>>      offsetStates;

   private OperatorMetrics                                     metrics;

   public RocketMQSource(MessageDeserializationSchema<OUT> schema,
String topic, String group, String tag){
      this.schema = schema;
      this.topic = topic;
      this.group = group;
      this.tag = tag;
   }


   public RocketMQSource(){
   }

   public RocketMQSource<OUT> withSchema(MessageDeserializationSchema schema){
      this.schema = schema;
      return this;
   }
   public RocketMQSource<OUT> withTopic(String topic){
      this.topic = topic;
      return this;
   }
   public RocketMQSource<OUT> withGroup(String group){
      this.group = group;
      return this;
   }
   public RocketMQSource<OUT> withTag(String tag){
      this.tag = tag;
      return this;

   }




   public RocketMQSource(MessageDeserializationSchema<OUT> schema,
String topic, String group){
      this(schema,topic,group,"*");
   }


   private void checkCanceled(){
      Validate.isTrue(canceled.get() == false,"RocketMQ is Canceled!!");
   }
   @Override
   public void open(Configuration parameters) throws Exception {
      LOGGER.warn("RocketMQ Source topic:{},group:{}
open...",this.topic,this.group);

      String nameServer =
getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get(Configurations.KEY_MQ_NS);
      Validate.notBlank(topic,"topic can not be empty");
      Validate.notBlank(group,"group can not be empty");
      com.weidian.di.connector.source.RocketMQSource
      this.checkCanceled();

      if(offsetTable == null){
         offsetTable = new ConcurrentHashMap<>();
      }
      if(updateTimeTable == null){
         updateTimeTable = new ConcurrentHashMap<>();
      }
      if(restoreTable == null){
         restoreTable = new ConcurrentHashMap<>();
      }

      this.running = new AtomicBoolean(false);
      this.consumerScheduleService = new
MQPullConsumerScheduleService(this.group);
      this.consumer = consumerScheduleService.getDefaultMQPullConsumer();

      this.consumer.setNamesrvAddr(nameServer);
      this.metrics = new OperatorMetrics("RocketMQ-Source-" +
topic,getRuntimeContext().getMetricGroup());
      
this.consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
   }

   @Override
   public void run(SourceContext<OUT> ctx) throws Exception {
      LOGGER.warn("RocketMQ Source:{},tag:{} run...",this.topic,this.tag);

      AtomicLong counter = new AtomicLong(0);
      this.checkCanceled();
      final Object lock = ctx.getCheckpointLock();
      consumerScheduleService.setPullThreadNums(PULL_POOL_INIT_SIZE);
      consumerScheduleService.registerPullTaskCallback(topic,(mq,context) -> {
         try{
            //对持续大量消息进行优化,在一个线程内部持续处理,直到没有消息。没有消息后会停顿50ms,然后再扔到Schedule调度线程中
            while(running.get()){
               long offset = getMessageQueueOffset(mq);
               if(offset < 0){
                  LOGGER.warn("message queue:{}.{} offset <
0",mq.getTopic(),mq.getQueueId());
                  return;
               }

               PullResult pullResult =
consumer.pull(mq,tag,offset,PULL_BATCH_SIZE);

               //RocketMQ 之前返回的minOffset可能ILLGAL,此时直接使用minOffset.
               if(pullResult.getPullStatus() ==
PullStatus.OFFSET_ILLEGAL || offset < pullResult.getMinOffset()){
                  offset = pullResult.getMinOffset();
                  LOGGER.warn("MQ {}.{} pull message offset
illegal,status:{},offset:{},pullResult:{}",mq.getTopic(),mq.getQueueId(),pullResult.getPullStatus(),offset,pullResult);
                  pullResult = consumer.pull(mq,tag,offset,PULL_BATCH_SIZE);
               }

               if(pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG){
                  LOGGER.warn("MQ {}.{} pull message no match
message,status:{},offset:{},pullResult:{}",mq.getTopic(),tag,pullResult.getPullStatus(),offset,pullResult);
                  synchronized (lock){
                     //minOffset < offset
但是获取的结果是NO_MATCHED_MSG。标识结果中没有这个tag的消息。此时直接将位点置为最新的,避免产生堆积。
                     if(pullResult.getNextBeginOffset() <=
pullResult.getMaxOffset()){
                        long nextOffset = pullResult.getNextBeginOffset();
                        putOffset(mq,nextOffset);
                        consumer.updateConsumeOffset(mq,nextOffset);
                     }
                  }
                  break;
               }

               //没有消息跳出并停顿制定时间。返回到调度线程中
               if(pullResult.getPullStatus() != PullStatus.FOUND){
                  context.setPullNextDelayTimeMillis(DELAY_MSG_NOT_FOUND);
                  break;
               }


               //处理消息
               List<MessageExt> messages = pullResult.getMsgFoundList();
               for(MessageExt message : messages){
                  metrics.in(1);
                  long start = System.currentTimeMillis();
                  try{
                     OUT data = schema.deserializeKeyAndValue(message);

                     //内部有做加锁处理,此处就不
                     if(data != null){

ctx.collectWithTimestamp(data,message.getBornTimestamp());
                        metrics.out(1);
                     }
                  }catch (Throwable e){
                     LOGGER.warn("error in process message:" + topic,e);
                     metrics.failed(1);
                     LOGGER.error(String.format("rmq:%s source
exception",topic),e);
                  }

                  if(System.currentTimeMillis() - start > 100){
//                   LOGGER.warn("[SLOW-PROCESS] Rocket MQ:{}
collect,cost:{}",mq.getTopic() + mq.getQueueId(),
(System.currentTimeMillis() - start));
                  }
               }

               //逻辑上,此处可以不加锁,MessageQueue不会处于多线程消费的场景。
               long start = System.currentTimeMillis();
               synchronized (lock){
                  long nextOffset = pullResult.getNextBeginOffset();
                  putOffset(mq,nextOffset);
                  consumer.updateConsumeOffset(mq,nextOffset);
               }
               if(System.currentTimeMillis() - start > 100){
                  LOGGER.info("[SLOW-PROCESS] Rocket MQ:{} offset
update,cost:{}",mq.getTopic() + mq.getQueueId(),
(System.currentTimeMillis() - start));
               }

               if(pullResult.getMsgFoundList().size() < PULL_BATCH_SIZE){
                  context.setPullNextDelayTimeMillis(0);
                  break;
               }
            }
         }catch (RemotingException e){
            LOGGER.error(String.format("[Restart] RemotingException
rmq:%s source exception",topic),e);
            throw new RuntimeException(e);
         }catch (MQBrokerException e){
            LOGGER.error(String.format("[Restart] MQBrokerException
rmq:%s source exception",topic),e);
            throw new RuntimeException(e);
         }catch (Exception e){
            metrics.failed(1);
            LOGGER.error(String.format("rmq:%s source exception",topic),e);
            context.setPullNextDelayTimeMillis(DELAY_WHEN_EXCEPTION);
         }
      });

      this.checkCanceled();
      consumerScheduleService.start();
      running.set(true);

      awaitTermination();
      LOGGER.warn("RocketMQ is complete in run method!(canceled or closed)");
   }


   private void awaitTermination() throws InterruptedException {
      while (running.get()) {
         Thread.sleep(100);
      }
   }

   private long getMessageQueueOffset(MessageQueue mq) throws MQClientException{
      Long offset = offsetTable.get(mq);

      if(offset == null){
         offset = restoreTable.get(mq);
      }

      if(offset == null){
         offset = consumer.fetchConsumeOffset(mq,true);
         LOGGER.warn("mq:{},fetch offset from remote offset={} ",mq,offset);
         if(offset < 0){
            //目前我们选择队列中最早的进行消费。补上最近3天的数据,用户可以自己到RocketMQ中重置位点来变更
            offset = consumer.minOffset(mq);
         }
         putOffset(mq,offset);
      }
      return offset;
   }


   AtomicLong count = new AtomicLong(0);
   private void putOffset(MessageQueue mq,long offset){
      offsetTable.put(mq,offset);
      updateTimeTable.put(mq,System.currentTimeMillis());

      //2020-07-22 lizheng让我去掉
//    if(count.get() % 100 == 0){
//       LOGGER.warn("mq:{} offset changed to :{}",mq,offset);
//    }
   }


   @Override
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
      if(!running.get()){
         LOGGER.warn("mq source {} is closed,can not snapshotState",this.topic);
         return;
      }

      offsetStates.clear();

      Iterator<Map.Entry<MessageQueue,Long>> iterator =
offsetTable.entrySet().iterator();
      while(iterator.hasNext()){
         Map.Entry<MessageQueue,Long> entry = iterator.next();
         if((System.currentTimeMillis() -
updateTimeTable.get(entry.getKey())) > 10 * 60 * 1000){
            LOGGER.warn("mq {} is out of date,will not snapshot it!
offset:{}",entry.getKey(),entry.getValue());
            iterator.remove();
         }else{
            offsetStates.add(Tuple2.of(entry.getKey(),entry.getValue()));
         }
      }

//2020-07-22 lizheng让我去掉
//    LOGGER.warn("snapshot {}.{} offsets:{}",topic,tag,offsetTable);

   }

   @Override
   public void initializeState(FunctionInitializationContext context)
throws Exception {
      LOGGER.warn("RocketMQ Source:{} initialize state...",this.topic);

      this.offsetStates = context.getOperatorStateStore().getUnionListState(
            new ListStateDescriptor<Tuple2<MessageQueue, Long>>(
                  "topic-partition-offset-state",
                  TypeInformation.of(new TypeHint<Tuple2<MessageQueue,
Long>>() {})
            )
      );

      if(context.isRestored()){
         if(restoreTable == null){
            restoreTable = new ConcurrentHashMap<>();
         }
         for(Tuple2<MessageQueue,Long> queueOffset : offsetStates.get()){
            restoreTable.put(queueOffset.f0,queueOffset.f1);
         }
         LOGGER.warn("Topic {} restore state:{}",this.topic,restoreTable);
      }else{
         LOGGER.warn("No restore state for topic:{}",this.topic);
      }
   }



   @Override
   public void close() throws Exception {
      LOGGER.warn("RocketMQ topic:{} close...",this.topic);
      this.cancel();
   }

   @Override
   public void cancel() {
      LOGGER.warn("RocketMQ topic:{} cancel...",this.topic);
      canceled.set(true);
      running.set(false);

      if(consumerScheduleService != null){
         consumerScheduleService.shutdown();
      }
      offsetTable.clear();
      restoreTable.clear();
   }

   @Override
   public TypeInformation<OUT> getProducedType() {
      return this.schema.getProducedType();
   }
}

回复