We are migrating to KafkaSource
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java>
 from FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>.
We have disabled auto commit of offset and instead committing them manually
to some external store

We override FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>
 and then on an overridden instance of KafkaFetcher
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java>
 we try to store the *offset* in some external store by overriding
doCommitInternalOffsetsToKafka

 protected void
doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
        @Nonnull KafkaCommitCallback commitCallback) throws Exception {
//Store offset in S3
}

Now In order to migrate we tried coping/overriding KafkaSource,
KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant
code which somehow does not look correct

In Custom KafkaSourceReader I tried overriding snapshotState

@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
   // custom logic to store offset in s3
   return super.snapshotState(checkpointId);
}

Is this correct or Is there any other way to achieve the same.


I have asked the similar questions in Stackoverflow
<https://stackoverflow.com/questions/71092656/how-to-implement-customsnapshotstate-in-kafkasource-kafkasourcereader>



Regards,

Santosh

Reply via email to