We are migrating to KafkaSource <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java> from FlinkKafkaConsumer <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>. We have disabled auto commit of offset and instead committing them manually to some external store
We override FlinkKafkaConsumer <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java> and then on an overridden instance of KafkaFetcher <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java> we try to store the *offset* in some external store by overriding doCommitInternalOffsetsToKafka protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { //Store offset in S3 } Now In order to migrate we tried coping/overriding KafkaSource, KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant code which somehow does not look correct In Custom KafkaSourceReader I tried overriding snapshotState @Override public List<KafkaPartitionSplit> snapshotState(long checkpointId) { // custom logic to store offset in s3 return super.snapshotState(checkpointId); } Is this correct or Is there any other way to achieve the same. I have asked the similar questions in Stackoverflow <https://stackoverflow.com/questions/71092656/how-to-implement-customsnapshotstate-in-kafkasource-kafkasourcereader> Regards, Santosh