Re: Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

2022-02-14 Thread Niklas Semmler
Hi Santosh,

It’s best to avoid cross-posting. Let’s keep the discussion to SO.

Best regards,
Niklas

> On 12. Feb 2022, at 16:39, santosh joshi  wrote:
> 
> We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled 
> auto commit of offset and instead committing them manually to some external 
> store
> 
> We override FlinkKafkaConsumer and then on an overridden instance of 
> KafkaFetcher we try to store the offset in some external store by overriding 
> doCommitInternalOffsetsToKafka
> 
>  protected void doCommitInternalOffsetsToKafka(Map 
> offsets,
> @Nonnull KafkaCommitCallback commitCallback) throws Exception {
> //Store offset in S3
> }
> 
> Now In order to migrate we tried coping/overriding KafkaSource, 
> KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant 
> code which somehow does not look correct
> 
> In Custom KafkaSourceReader I tried overriding snapshotState
> 
> @Override
> public List snapshotState(long checkpointId) {
>// custom logic to store offset in s3
>return super.snapshotState(checkpointId);
> }
> 
> Is this correct or Is there any other way to achieve the same.
> 
> I have asked the similar questions in Stackoverflow
> 
> 
> Regards,
> Santosh 



Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

2022-02-12 Thread santosh joshi
We are migrating to KafkaSource

 from FlinkKafkaConsumer
.
We have disabled auto commit of offset and instead committing them manually
to some external store

We override FlinkKafkaConsumer

 and then on an overridden instance of KafkaFetcher

 we try to store the *offset* in some external store by overriding
doCommitInternalOffsetsToKafka

 protected void
doCommitInternalOffsetsToKafka(Map offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
//Store offset in S3
}

Now In order to migrate we tried coping/overriding KafkaSource,
KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant
code which somehow does not look correct

In Custom KafkaSourceReader I tried overriding snapshotState

@Override
public List snapshotState(long checkpointId) {
   // custom logic to store offset in s3
   return super.snapshotState(checkpointId);
}

Is this correct or Is there any other way to achieve the same.


I have asked the similar questions in Stackoverflow




Regards,

Santosh