Re: Kafka Source Recovery Behavior

2021-11-18 Thread Steven Wu
Qingsheng,

For the scenario described by Mason in the original email, I think it is
safe to remove split/topic upson recovery without worrying about data loss,
since it is a conscious choice by the user to switch to a different set of
topics.

I thought the problem is that KafkaSourceReader just restores the
splits/partitions and reads from them without checking if they are still
valid (belong to subscribed topics). Not sure if this requires change in
the Kafka source checkpointing or not. Currently, I believe both the
enumerator and readers checkpoint their own states. if readers don't
checkpoint and always wait for the enumerator to assign splits/partitions
upon recovery, this may be easier as the filter/check can just be done by
the enumerator.

Thanks,
Steven


On Tue, Nov 16, 2021 at 7:17 PM Qingsheng Ren  wrote:

> Hi Mason,
>
> Sorry for my late response!
>
> “there was no logic to filter/remove splits”
>
>
> Yes we indeed miss a split removal mechanism. Actually this is quite a
> tricky one considering exactly-once semantic: there’s risk of losing data
> if we remove a partition / topic from Kafka. There was a discussion about
> this topic in the user mailing list:
> https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
>
> An immature solution in my mind is that we can remove a split with the
> help of watermark. Once the watermark in a split has been pushed to end of
> global window, then we can assume that there’s no more new records in the
> split and we can remove it safely. But, this will invalidate all previous
> checkpoints because these split might not exist anymore in the external
> system (like topic has been removed in Kafka).
>
> Hope this could answer your question and looking forward to your inspiring
> ideas!
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Nov 10, 2021, 11:32 PM +0800, Mason Chen ,
> wrote:
>
>
> there was no logic to filter/remove splits
>
>


Re: Kafka Source Recovery Behavior

2021-11-16 Thread Qingsheng Ren
Hi Mason,

Sorry for my late response!
> quote_type
> “there was no logic to filter/remove splits”

Yes we indeed miss a split removal mechanism. Actually this is quite a tricky 
one considering exactly-once semantic: there’s risk of losing data if we remove 
a partition / topic from Kafka. There was a discussion about this topic in the 
user mailing list: 
https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt

An immature solution in my mind is that we can remove a split with the help of 
watermark. Once the watermark in a split has been pushed to end of global 
window, then we can assume that there’s no more new records in the split and we 
can remove it safely. But, this will invalidate all previous checkpoints 
because these split might not exist anymore in the external system (like topic 
has been removed in Kafka).

Hope this could answer your question and looking forward to your inspiring 
ideas!

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Nov 10, 2021, 11:32 PM +0800, Mason Chen , wrote:
>
> there was no logic to filter/remove splits


Re: Kafka Source Recovery Behavior

2021-11-10 Thread Mason Chen
Hi all,

Any update on this?

Best,
Mason

On Sat, Oct 30, 2021 at 5:56 AM Arvid Heise  wrote:

> This seems to be a valid concern but I'm not deep enough to clearly say
> that this is indeed a bug. @renqschn  could you
> please double-check?
>
> On Thu, Oct 28, 2021 at 8:39 PM Mason Chen  wrote:
>
>> Hi all,
>>
>> I noticed that the KafkaSourceReader did not have a pointer to the
>> KafkaSubscriber, so I was wondering if this could be a bug:
>>
>> 1. User has a flink job with topic set A and takes savepoint
>> 2. User modifies flink job to read from topic set B; however, splits are
>> still read from topic set A (since there’s no logic to filter/remove
>> splits).
>>
>> To clarify, I haven’t tested this scenario out myself, but just through
>> reading the code, there was no logic to filter/remove splits. Just to add
>> and complete.
>>
>> Best,
>> Mason
>>
>


Re: Kafka Source Recovery Behavior

2021-10-30 Thread Arvid Heise
This seems to be a valid concern but I'm not deep enough to clearly say
that this is indeed a bug. @renqschn  could you please
double-check?

On Thu, Oct 28, 2021 at 8:39 PM Mason Chen  wrote:

> Hi all,
>
> I noticed that the KafkaSourceReader did not have a pointer to the
> KafkaSubscriber, so I was wondering if this could be a bug:
>
> 1. User has a flink job with topic set A and takes savepoint
> 2. User modifies flink job to read from topic set B; however, splits are
> still read from topic set A (since there’s no logic to filter/remove
> splits).
>
> To clarify, I haven’t tested this scenario out myself, but just through
> reading the code, there was no logic to filter/remove splits. Just to add
> and complete.
>
> Best,
> Mason
>


Kafka Source Recovery Behavior

2021-10-28 Thread Mason Chen
Hi all,

I noticed that the KafkaSourceReader did not have a pointer to the
KafkaSubscriber, so I was wondering if this could be a bug:

1. User has a flink job with topic set A and takes savepoint
2. User modifies flink job to read from topic set B; however, splits are
still read from topic set A (since there’s no logic to filter/remove
splits).

To clarify, I haven’t tested this scenario out myself, but just through
reading the code, there was no logic to filter/remove splits. Just to add
and complete.

Best,
Mason