大家好,公司内部写的自定义RocektMqSource,会偶现位点前移的现象,偶现时间不定,目前找不出原因。Flink 版本
1.4.2,目前是不会从checkPoint恢复,但是会做checkPoint
Source代码如下:
public class RocketMQSource<OUT> extends
RichParallelSourceFunction<OUT> implements
CheckpointedFunction,ResultTypeQueryable<OUT> {
public static final int DELAY_MSG_NOT_FOUND =
1000; //没有消息时,Delay 50ms后再拉取
public static final int DELAY_WHEN_EXCEPTION =
1000; //出现异常时,Delay 1s后再拉取
public static final int PULL_POOL_INIT_SIZE = 20;
//拉去的PullSize
public static final int PULL_BATCH_SIZE = 32;
//一次批量拉去大小
private static final Logger LOGGER =
LoggerFactory.getLogger(RocketMQSource.class);
private transient MQPullConsumerScheduleService consumerScheduleService;
private transient DefaultMQPullConsumer consumer;
private MessageDeserializationSchema<OUT> schema;
private String topic;
private String group;
private String tag;
private AtomicBoolean canceled = new AtomicBoolean(false);
private AtomicBoolean running;
private Map<MessageQueue,Long> offsetTable;
private Map<MessageQueue,Long> updateTimeTable;
private Map<MessageQueue,Long> restoreTable;
private transient ListState<Tuple2<MessageQueue,Long>> offsetStates;
private OperatorMetrics metrics;
public RocketMQSource(MessageDeserializationSchema<OUT> schema,
String topic, String group, String tag){
this.schema = schema;
this.topic = topic;
this.group = group;
this.tag = tag;
}
public RocketMQSource(){
}
public RocketMQSource<OUT> withSchema(MessageDeserializationSchema schema){
this.schema = schema;
return this;
}
public RocketMQSource<OUT> withTopic(String topic){
this.topic = topic;
return this;
}
public RocketMQSource<OUT> withGroup(String group){
this.group = group;
return this;
}
public RocketMQSource<OUT> withTag(String tag){
this.tag = tag;
return this;
}
public RocketMQSource(MessageDeserializationSchema<OUT> schema,
String topic, String group){
this(schema,topic,group,"*");
}
private void checkCanceled(){
Validate.isTrue(canceled.get() == false,"RocketMQ is Canceled!!");
}
@Override
public void open(Configuration parameters) throws Exception {
LOGGER.warn("RocketMQ Source topic:{},group:{}
open...",this.topic,this.group);
String nameServer =
getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get(Configurations.KEY_MQ_NS);
Validate.notBlank(topic,"topic can not be empty");
Validate.notBlank(group,"group can not be empty");
com.weidian.di.connector.source.RocketMQSource
this.checkCanceled();
if(offsetTable == null){
offsetTable = new ConcurrentHashMap<>();
}
if(updateTimeTable == null){
updateTimeTable = new ConcurrentHashMap<>();
}
if(restoreTable == null){
restoreTable = new ConcurrentHashMap<>();
}
this.running = new AtomicBoolean(false);
this.consumerScheduleService = new
MQPullConsumerScheduleService(this.group);
this.consumer = consumerScheduleService.getDefaultMQPullConsumer();
this.consumer.setNamesrvAddr(nameServer);
this.metrics = new OperatorMetrics("RocketMQ-Source-" +
topic,getRuntimeContext().getMetricGroup());
this.consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
}
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
LOGGER.warn("RocketMQ Source:{},tag:{} run...",this.topic,this.tag);
AtomicLong counter = new AtomicLong(0);
this.checkCanceled();
final Object lock = ctx.getCheckpointLock();
consumerScheduleService.setPullThreadNums(PULL_POOL_INIT_SIZE);
consumerScheduleService.registerPullTaskCallback(topic,(mq,context) -> {
try{
//对持续大量消息进行优化,在一个线程内部持续处理,直到没有消息。没有消息后会停顿50ms,然后再扔到Schedule调度线程中
while(running.get()){
long offset = getMessageQueueOffset(mq);
if(offset < 0){
LOGGER.warn("message queue:{}.{} offset <
0",mq.getTopic(),mq.getQueueId());
return;
}
PullResult pullResult =
consumer.pull(mq,tag,offset,PULL_BATCH_SIZE);
//RocketMQ 之前返回的minOffset可能ILLGAL,此时直接使用minOffset.
if(pullResult.getPullStatus() ==
PullStatus.OFFSET_ILLEGAL || offset < pullResult.getMinOffset()){
offset = pullResult.getMinOffset();
LOGGER.warn("MQ {}.{} pull message offset
illegal,status:{},offset:{},pullResult:{}",mq.getTopic(),mq.getQueueId(),pullResult.getPullStatus(),offset,pullResult);
pullResult = consumer.pull(mq,tag,offset,PULL_BATCH_SIZE);
}
if(pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG){
LOGGER.warn("MQ {}.{} pull message no match
message,status:{},offset:{},pullResult:{}",mq.getTopic(),tag,pullResult.getPullStatus(),offset,pullResult);
synchronized (lock){
//minOffset < offset
但是获取的结果是NO_MATCHED_MSG。标识结果中没有这个tag的消息。此时直接将位点置为最新的,避免产生堆积。
if(pullResult.getNextBeginOffset() <=
pullResult.getMaxOffset()){
long nextOffset = pullResult.getNextBeginOffset();
putOffset(mq,nextOffset);
consumer.updateConsumeOffset(mq,nextOffset);
}
}
break;
}
//没有消息跳出并停顿制定时间。返回到调度线程中
if(pullResult.getPullStatus() != PullStatus.FOUND){
context.setPullNextDelayTimeMillis(DELAY_MSG_NOT_FOUND);
break;
}
//处理消息
List<MessageExt> messages = pullResult.getMsgFoundList();
for(MessageExt message : messages){
metrics.in(1);
long start = System.currentTimeMillis();
try{
OUT data = schema.deserializeKeyAndValue(message);
//内部有做加锁处理,此处就不
if(data != null){
ctx.collectWithTimestamp(data,message.getBornTimestamp());
metrics.out(1);
}
}catch (Throwable e){
LOGGER.warn("error in process message:" + topic,e);
metrics.failed(1);
LOGGER.error(String.format("rmq:%s source
exception",topic),e);
}
if(System.currentTimeMillis() - start > 100){
// LOGGER.warn("[SLOW-PROCESS] Rocket MQ:{}
collect,cost:{}",mq.getTopic() + mq.getQueueId(),
(System.currentTimeMillis() - start));
}
}
//逻辑上,此处可以不加锁,MessageQueue不会处于多线程消费的场景。
long start = System.currentTimeMillis();
synchronized (lock){
long nextOffset = pullResult.getNextBeginOffset();
putOffset(mq,nextOffset);
consumer.updateConsumeOffset(mq,nextOffset);
}
if(System.currentTimeMillis() - start > 100){
LOGGER.info("[SLOW-PROCESS] Rocket MQ:{} offset
update,cost:{}",mq.getTopic() + mq.getQueueId(),
(System.currentTimeMillis() - start));
}
if(pullResult.getMsgFoundList().size() < PULL_BATCH_SIZE){
context.setPullNextDelayTimeMillis(0);
break;
}
}
}catch (RemotingException e){
LOGGER.error(String.format("[Restart] RemotingException
rmq:%s source exception",topic),e);
throw new RuntimeException(e);
}catch (MQBrokerException e){
LOGGER.error(String.format("[Restart] MQBrokerException
rmq:%s source exception",topic),e);
throw new RuntimeException(e);
}catch (Exception e){
metrics.failed(1);
LOGGER.error(String.format("rmq:%s source exception",topic),e);
context.setPullNextDelayTimeMillis(DELAY_WHEN_EXCEPTION);
}
});
this.checkCanceled();
consumerScheduleService.start();
running.set(true);
awaitTermination();
LOGGER.warn("RocketMQ is complete in run method!(canceled or closed)");
}
private void awaitTermination() throws InterruptedException {
while (running.get()) {
Thread.sleep(100);
}
}
private long getMessageQueueOffset(MessageQueue mq) throws MQClientException{
Long offset = offsetTable.get(mq);
if(offset == null){
offset = restoreTable.get(mq);
}
if(offset == null){
offset = consumer.fetchConsumeOffset(mq,true);
LOGGER.warn("mq:{},fetch offset from remote offset={} ",mq,offset);
if(offset < 0){
//目前我们选择队列中最早的进行消费。补上最近3天的数据,用户可以自己到RocketMQ中重置位点来变更
offset = consumer.minOffset(mq);
}
putOffset(mq,offset);
}
return offset;
}
AtomicLong count = new AtomicLong(0);
private void putOffset(MessageQueue mq,long offset){
offsetTable.put(mq,offset);
updateTimeTable.put(mq,System.currentTimeMillis());
//2020-07-22 lizheng让我去掉
// if(count.get() % 100 == 0){
// LOGGER.warn("mq:{} offset changed to :{}",mq,offset);
// }
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if(!running.get()){
LOGGER.warn("mq source {} is closed,can not snapshotState",this.topic);
return;
}
offsetStates.clear();
Iterator<Map.Entry<MessageQueue,Long>> iterator =
offsetTable.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry<MessageQueue,Long> entry = iterator.next();
if((System.currentTimeMillis() -
updateTimeTable.get(entry.getKey())) > 10 * 60 * 1000){
LOGGER.warn("mq {} is out of date,will not snapshot it!
offset:{}",entry.getKey(),entry.getValue());
iterator.remove();
}else{
offsetStates.add(Tuple2.of(entry.getKey(),entry.getValue()));
}
}
//2020-07-22 lizheng让我去掉
// LOGGER.warn("snapshot {}.{} offsets:{}",topic,tag,offsetTable);
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
LOGGER.warn("RocketMQ Source:{} initialize state...",this.topic);
this.offsetStates = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<Tuple2<MessageQueue, Long>>(
"topic-partition-offset-state",
TypeInformation.of(new TypeHint<Tuple2<MessageQueue,
Long>>() {})
)
);
if(context.isRestored()){
if(restoreTable == null){
restoreTable = new ConcurrentHashMap<>();
}
for(Tuple2<MessageQueue,Long> queueOffset : offsetStates.get()){
restoreTable.put(queueOffset.f0,queueOffset.f1);
}
LOGGER.warn("Topic {} restore state:{}",this.topic,restoreTable);
}else{
LOGGER.warn("No restore state for topic:{}",this.topic);
}
}
@Override
public void close() throws Exception {
LOGGER.warn("RocketMQ topic:{} close...",this.topic);
this.cancel();
}
@Override
public void cancel() {
LOGGER.warn("RocketMQ topic:{} cancel...",this.topic);
canceled.set(true);
running.set(false);
if(consumerScheduleService != null){
consumerScheduleService.shutdown();
}
offsetTable.clear();
restoreTable.clear();
}
@Override
public TypeInformation<OUT> getProducedType() {
return this.schema.getProducedType();
}
}