Re: Kafka Source Recovery Behavior
Qingsheng, For the scenario described by Mason in the original email, I think it is safe to remove split/topic upson recovery without worrying about data loss, since it is a conscious choice by the user to switch to a different set of topics. I thought the problem is that KafkaSourceReader just restores the splits/partitions and reads from them without checking if they are still valid (belong to subscribed topics). Not sure if this requires change in the Kafka source checkpointing or not. Currently, I believe both the enumerator and readers checkpoint their own states. if readers don't checkpoint and always wait for the enumerator to assign splits/partitions upon recovery, this may be easier as the filter/check can just be done by the enumerator. Thanks, Steven On Tue, Nov 16, 2021 at 7:17 PM Qingsheng Ren wrote: > Hi Mason, > > Sorry for my late response! > > “there was no logic to filter/remove splits” > > > Yes we indeed miss a split removal mechanism. Actually this is quite a > tricky one considering exactly-once semantic: there’s risk of losing data > if we remove a partition / topic from Kafka. There was a discussion about > this topic in the user mailing list: > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt > > An immature solution in my mind is that we can remove a split with the > help of watermark. Once the watermark in a split has been pushed to end of > global window, then we can assume that there’s no more new records in the > split and we can remove it safely. But, this will invalidate all previous > checkpoints because these split might not exist anymore in the external > system (like topic has been removed in Kafka). > > Hope this could answer your question and looking forward to your inspiring > ideas! > > -- > Best Regards, > > Qingsheng Ren > Email: renqs...@gmail.com > On Nov 10, 2021, 11:32 PM +0800, Mason Chen , > wrote: > > > there was no logic to filter/remove splits > >
Re: Kafka Source Recovery Behavior
Hi Mason, Sorry for my late response! > quote_type > “there was no logic to filter/remove splits” Yes we indeed miss a split removal mechanism. Actually this is quite a tricky one considering exactly-once semantic: there’s risk of losing data if we remove a partition / topic from Kafka. There was a discussion about this topic in the user mailing list: https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt An immature solution in my mind is that we can remove a split with the help of watermark. Once the watermark in a split has been pushed to end of global window, then we can assume that there’s no more new records in the split and we can remove it safely. But, this will invalidate all previous checkpoints because these split might not exist anymore in the external system (like topic has been removed in Kafka). Hope this could answer your question and looking forward to your inspiring ideas! -- Best Regards, Qingsheng Ren Email: renqs...@gmail.com On Nov 10, 2021, 11:32 PM +0800, Mason Chen , wrote: > > there was no logic to filter/remove splits
Re: Kafka Source Recovery Behavior
Hi all, Any update on this? Best, Mason On Sat, Oct 30, 2021 at 5:56 AM Arvid Heise wrote: > This seems to be a valid concern but I'm not deep enough to clearly say > that this is indeed a bug. @renqschn could you > please double-check? > > On Thu, Oct 28, 2021 at 8:39 PM Mason Chen wrote: > >> Hi all, >> >> I noticed that the KafkaSourceReader did not have a pointer to the >> KafkaSubscriber, so I was wondering if this could be a bug: >> >> 1. User has a flink job with topic set A and takes savepoint >> 2. User modifies flink job to read from topic set B; however, splits are >> still read from topic set A (since there’s no logic to filter/remove >> splits). >> >> To clarify, I haven’t tested this scenario out myself, but just through >> reading the code, there was no logic to filter/remove splits. Just to add >> and complete. >> >> Best, >> Mason >> >
Re: Kafka Source Recovery Behavior
This seems to be a valid concern but I'm not deep enough to clearly say that this is indeed a bug. @renqschn could you please double-check? On Thu, Oct 28, 2021 at 8:39 PM Mason Chen wrote: > Hi all, > > I noticed that the KafkaSourceReader did not have a pointer to the > KafkaSubscriber, so I was wondering if this could be a bug: > > 1. User has a flink job with topic set A and takes savepoint > 2. User modifies flink job to read from topic set B; however, splits are > still read from topic set A (since there’s no logic to filter/remove > splits). > > To clarify, I haven’t tested this scenario out myself, but just through > reading the code, there was no logic to filter/remove splits. Just to add > and complete. > > Best, > Mason >
Kafka Source Recovery Behavior
Hi all, I noticed that the KafkaSourceReader did not have a pointer to the KafkaSubscriber, so I was wondering if this could be a bug: 1. User has a flink job with topic set A and takes savepoint 2. User modifies flink job to read from topic set B; however, splits are still read from topic set A (since there’s no logic to filter/remove splits). To clarify, I haven’t tested this scenario out myself, but just through reading the code, there was no logic to filter/remove splits. Just to add and complete. Best, Mason