Re: Kafka Partition Discovery

2021-09-24 Thread Roman Khachatryan
Hi,

It seems like a useful feature, but it's probably better to have it in
the Kafka consumer. There is a related KIP in progress:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients

I'd like to pull Arvid into the discussion as he might be better
familiar with the subject.

Regards,
Roman


On Wed, Sep 22, 2021 at 7:58 PM Mason Chen  wrote:
>
> Hi all,
>
> We are sometimes facing a connection issue with Kafka when a broker restarts
>
> ```
> java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: 
> Timeout expired while fetching topic metadata
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:846)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
> while fetching topic metadata
> ```
>
> Can a retry be added to the partition discovery mechanism?
>
> Best,
> Mason


Kafka Partition Discovery

2021-09-22 Thread Mason Chen
Hi all,

We are sometimes facing a connection issue with Kafka when a broker restarts

```
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:846)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
while fetching topic metadata
```

Can a retry be added to the partition discovery mechanism?

Best,
Mason


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-15 Thread Seth Wiesman
I just want to chime in that if you really do need to drop a partition,
Flink already supports a solution.

If you manually stop the job with a savepoint and restart it with a new UID
on the source operator, along with passing the --allowNonRestoredState flag
to the client, the source will disregard existing state and start fresh.
You can then use the start position configuration to determine where to
begin when restarting on the existing partitions.


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/production_ready/#set-uuids-for-all-operators
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/cli/#starting-a-job-from-a-savepoint
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#starting-offset

On Wed, Sep 15, 2021 at 6:09 AM David Morávek  wrote:

> I'll try to be more direct with the answer as you already have the context
> on what the issue is.
>
> When this happens we basically have these options:
>
> 1) We can throw an exception (with good wording, so user knows what's
> going on) and fail the job. This forces user to take an immediate action
> and fix the issue.
> 2) We can log a warning and keeping the job running. Most users won't
> notice this unless they are using tools such as Sentry / StackDriver with
> automatic alerting. In most cases this will hide a real problem, that could
> be really hard to discover / repair in later stages.
>
> So in other wording, Flink doesn't forget this in order to "proactively
> guard user against some serious troubles".
>
> Can you please elaborate little bit more about the use case and why it
> needs to be implemented the way it is? Maybe there could be an alternative
> solution to this.
>
> Best,
> D.
>
>
> On Tue, Sep 14, 2021 at 7:25 PM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> Thanks David. What you are saying makes sense. But, I keep hearing I
>> shouldn't delete the topic externally, and I keep asking why doesn't Flink
>> forget about the topic IF it has in fact been deleted externally (for
>> whatever reason).
>>
>> I think I will drop this now.
>>
>> On Tue, Sep 14, 2021 at 5:50 PM David Morávek  wrote:
>>
>>> We are basically describing the same thing with Fabian, just a different
>>> wording.
>>>
>>> The problem is that if you delete the topic externally, you're making an
>>> assumption that downstream processor (Flink in this case) has already
>>> consumed and RELIABLY processed all of the data from that topic (which may
>>> not be true). This would effectively lead to AT_MOST_ONCE delivery
>>> guarantees (in other words, we are OK with loosing data), which is a
>>> trade-off that _in_my_opinion_ we shouldn't make here.
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <
>>> cpa...@gmail.com> wrote:
>>>
 Hi all,

 Thank you for the replies, they are much appreciated.

 I'm sure I'm missing something obvious here, so bear with me...

 Fabian, regarding:

 "Flink will try to recover from the previous checkpoint which is
 invalid by now because the partition is not available anymore."

 The above would happen because the partition is not available anymore
 in Kafka (right?), and not because Flink's partition discoverer has removed
 it from its cache (i.e. even if Flink leaves it there, the topic doesn't
 exist in Kafka anymore, so that's the source of the problem in the scenario
 you outlined). In other words, what would be the *extra* harm from Flink
 cleaning up the partition from its cache after it knows that the partition
 is gone - this is the part I still don't understand.

 David, similarly:

 "actual topic deletion would need to be performed by Flink (not by the
 3rd party system as suggested in the original question)"

 The situation is that the topic has, for better or worse, already been
 deleted. So my question is one of cleanup, i.e. how is it useful for Flink
 to continue remembering the partition of an already-deleted topic? (the
 checkpoint is invalid regardless, right?)



 On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:

> On 9/14/21 3:57 PM, David Morávek wrote:
>
> Hi Jan,
>
> Notion of completeness is just one part of the problem. The second
> part is that once you remove the Kafka topic, you are no longer able to
> replay the data in case of failure.
>
> So you basically need a following workflow to ensure correctness:
>
> 1) Wait until there are no more elements in the topic (this can be
> done by checking watermark for that partition as you're suggesting)
> 2) Take a checkpoint N
> 3) Delete the topic (this effectively makes all the checkpoints < N
> invalid)
>
> Agree.
>
>
> If you switch order of 2) and 3) you have no way to recover from
> failure.
>
> Also 

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-15 Thread David Morávek
I'll try to be more direct with the answer as you already have the context
on what the issue is.

When this happens we basically have these options:

1) We can throw an exception (with good wording, so user knows what's going
on) and fail the job. This forces user to take an immediate action and fix
the issue.
2) We can log a warning and keeping the job running. Most users won't
notice this unless they are using tools such as Sentry / StackDriver with
automatic alerting. In most cases this will hide a real problem, that could
be really hard to discover / repair in later stages.

So in other wording, Flink doesn't forget this in order to "proactively
guard user against some serious troubles".

Can you please elaborate little bit more about the use case and why it
needs to be implemented the way it is? Maybe there could be an alternative
solution to this.

Best,
D.


On Tue, Sep 14, 2021 at 7:25 PM Constantinos Papadopoulos 
wrote:

> Thanks David. What you are saying makes sense. But, I keep hearing I
> shouldn't delete the topic externally, and I keep asking why doesn't Flink
> forget about the topic IF it has in fact been deleted externally (for
> whatever reason).
>
> I think I will drop this now.
>
> On Tue, Sep 14, 2021 at 5:50 PM David Morávek  wrote:
>
>> We are basically describing the same thing with Fabian, just a different
>> wording.
>>
>> The problem is that if you delete the topic externally, you're making an
>> assumption that downstream processor (Flink in this case) has already
>> consumed and RELIABLY processed all of the data from that topic (which may
>> not be true). This would effectively lead to AT_MOST_ONCE delivery
>> guarantees (in other words, we are OK with loosing data), which is a
>> trade-off that _in_my_opinion_ we shouldn't make here.
>>
>> Best,
>> D.
>>
>> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <
>> cpa...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Thank you for the replies, they are much appreciated.
>>>
>>> I'm sure I'm missing something obvious here, so bear with me...
>>>
>>> Fabian, regarding:
>>>
>>> "Flink will try to recover from the previous checkpoint which is invalid
>>> by now because the partition is not available anymore."
>>>
>>> The above would happen because the partition is not available anymore in
>>> Kafka (right?), and not because Flink's partition discoverer has removed it
>>> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
>>> in Kafka anymore, so that's the source of the problem in the scenario you
>>> outlined). In other words, what would be the *extra* harm from Flink
>>> cleaning up the partition from its cache after it knows that the partition
>>> is gone - this is the part I still don't understand.
>>>
>>> David, similarly:
>>>
>>> "actual topic deletion would need to be performed by Flink (not by the
>>> 3rd party system as suggested in the original question)"
>>>
>>> The situation is that the topic has, for better or worse, already been
>>> deleted. So my question is one of cleanup, i.e. how is it useful for Flink
>>> to continue remembering the partition of an already-deleted topic? (the
>>> checkpoint is invalid regardless, right?)
>>>
>>>
>>>
>>> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:
>>>
 On 9/14/21 3:57 PM, David Morávek wrote:

 Hi Jan,

 Notion of completeness is just one part of the problem. The second part
 is that once you remove the Kafka topic, you are no longer able to replay
 the data in case of failure.

 So you basically need a following workflow to ensure correctness:

 1) Wait until there are no more elements in the topic (this can be done
 by checking watermark for that partition as you're suggesting)
 2) Take a checkpoint N
 3) Delete the topic (this effectively makes all the checkpoints < N
 invalid)

 Agree.


 If you switch order of 2) and 3) you have no way to recover from
 failure.

 Also for this to work properly, actual topic deletion would need to be
 performed by Flink (not by the 3rd party system as suggested in the
 original question) in the second phase of 2PC (when you're sure that you've
 successfully taken a checkpoint, that has seen all the data).

 Agree, the deletion would have to be preceded by something like
 partition drain. What is needed is the watermark reaching end of global
 window (+inf) and a checkpoint. After that, the source can be removed and
 what happens with it is no concern any more. That applies to all sources in
 general. I don't know the implementation details, but it seems that the
 topic would have to be somehow marked as "draining", it would then be the
 responsibility of the reader to shift the watermark belonging to partitions
 of that topic to +inf. It would then be responsibility of Flink to verify
 that such source is removed only after a checkpoint is taken. Otherwise
 

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Thanks David. What you are saying makes sense. But, I keep hearing I
shouldn't delete the topic externally, and I keep asking why doesn't Flink
forget about the topic IF it has in fact been deleted externally (for
whatever reason).

I think I will drop this now.

On Tue, Sep 14, 2021 at 5:50 PM David Morávek  wrote:

> We are basically describing the same thing with Fabian, just a different
> wording.
>
> The problem is that if you delete the topic externally, you're making an
> assumption that downstream processor (Flink in this case) has already
> consumed and RELIABLY processed all of the data from that topic (which may
> not be true). This would effectively lead to AT_MOST_ONCE delivery
> guarantees (in other words, we are OK with loosing data), which is a
> trade-off that _in_my_opinion_ we shouldn't make here.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thank you for the replies, they are much appreciated.
>>
>> I'm sure I'm missing something obvious here, so bear with me...
>>
>> Fabian, regarding:
>>
>> "Flink will try to recover from the previous checkpoint which is invalid
>> by now because the partition is not available anymore."
>>
>> The above would happen because the partition is not available anymore in
>> Kafka (right?), and not because Flink's partition discoverer has removed it
>> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
>> in Kafka anymore, so that's the source of the problem in the scenario you
>> outlined). In other words, what would be the *extra* harm from Flink
>> cleaning up the partition from its cache after it knows that the partition
>> is gone - this is the part I still don't understand.
>>
>> David, similarly:
>>
>> "actual topic deletion would need to be performed by Flink (not by the
>> 3rd party system as suggested in the original question)"
>>
>> The situation is that the topic has, for better or worse, already been
>> deleted. So my question is one of cleanup, i.e. how is it useful for Flink
>> to continue remembering the partition of an already-deleted topic? (the
>> checkpoint is invalid regardless, right?)
>>
>>
>>
>> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:
>>
>>> On 9/14/21 3:57 PM, David Morávek wrote:
>>>
>>> Hi Jan,
>>>
>>> Notion of completeness is just one part of the problem. The second part
>>> is that once you remove the Kafka topic, you are no longer able to replay
>>> the data in case of failure.
>>>
>>> So you basically need a following workflow to ensure correctness:
>>>
>>> 1) Wait until there are no more elements in the topic (this can be done
>>> by checking watermark for that partition as you're suggesting)
>>> 2) Take a checkpoint N
>>> 3) Delete the topic (this effectively makes all the checkpoints < N
>>> invalid)
>>>
>>> Agree.
>>>
>>>
>>> If you switch order of 2) and 3) you have no way to recover from failure.
>>>
>>> Also for this to work properly, actual topic deletion would need to be
>>> performed by Flink (not by the 3rd party system as suggested in the
>>> original question) in the second phase of 2PC (when you're sure that you've
>>> successfully taken a checkpoint, that has seen all the data).
>>>
>>> Agree, the deletion would have to be preceded by something like
>>> partition drain. What is needed is the watermark reaching end of global
>>> window (+inf) and a checkpoint. After that, the source can be removed and
>>> what happens with it is no concern any more. That applies to all sources in
>>> general. I don't know the implementation details, but it seems that the
>>> topic would have to be somehow marked as "draining", it would then be the
>>> responsibility of the reader to shift the watermark belonging to partitions
>>> of that topic to +inf. It would then be responsibility of Flink to verify
>>> that such source is removed only after a checkpoint is taken. Otherwise
>>> there would be possible risk of data loss.
>>>
>>> This definitely looks like quite complex process.
>>>
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:
>>>
 Hi,

 just out of curiosity, would this problem be solvable by the ability to
 remove partitions, that declare, that do not contain more data
 (watermark reaching end of global window)? There is probably another
 problem with that topic can be recreated after being deleted, which
 could result in watermark moving back in time, but this problem might
 be
 there already.

   Jan

 On 9/14/21 3:08 PM, Fabian Paul wrote:
 > Hi Constantinos,
 >
 > I agree with David that it is not easily possible to remove a
 partition while a Flink job is running. Imagine the following scenario:
 >
 > Your Flink job initially works on 2 partitions belonging to two
 different topics and you have checkpointing enabled to guarantee
 > exactly-once delivery. It implies that on every 

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
We are basically describing the same thing with Fabian, just a different
wording.

The problem is that if you delete the topic externally, you're making an
assumption that downstream processor (Flink in this case) has already
consumed and RELIABLY processed all of the data from that topic (which may
not be true). This would effectively lead to AT_MOST_ONCE delivery
guarantees (in other words, we are OK with loosing data), which is a
trade-off that _in_my_opinion_ we shouldn't make here.

Best,
D.

On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos 
wrote:

> Hi all,
>
> Thank you for the replies, they are much appreciated.
>
> I'm sure I'm missing something obvious here, so bear with me...
>
> Fabian, regarding:
>
> "Flink will try to recover from the previous checkpoint which is invalid
> by now because the partition is not available anymore."
>
> The above would happen because the partition is not available anymore in
> Kafka (right?), and not because Flink's partition discoverer has removed it
> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
> in Kafka anymore, so that's the source of the problem in the scenario you
> outlined). In other words, what would be the *extra* harm from Flink
> cleaning up the partition from its cache after it knows that the partition
> is gone - this is the part I still don't understand.
>
> David, similarly:
>
> "actual topic deletion would need to be performed by Flink (not by the 3rd
> party system as suggested in the original question)"
>
> The situation is that the topic has, for better or worse, already been
> deleted. So my question is one of cleanup, i.e. how is it useful for Flink
> to continue remembering the partition of an already-deleted topic? (the
> checkpoint is invalid regardless, right?)
>
>
>
> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:
>
>> On 9/14/21 3:57 PM, David Morávek wrote:
>>
>> Hi Jan,
>>
>> Notion of completeness is just one part of the problem. The second part
>> is that once you remove the Kafka topic, you are no longer able to replay
>> the data in case of failure.
>>
>> So you basically need a following workflow to ensure correctness:
>>
>> 1) Wait until there are no more elements in the topic (this can be done
>> by checking watermark for that partition as you're suggesting)
>> 2) Take a checkpoint N
>> 3) Delete the topic (this effectively makes all the checkpoints < N
>> invalid)
>>
>> Agree.
>>
>>
>> If you switch order of 2) and 3) you have no way to recover from failure.
>>
>> Also for this to work properly, actual topic deletion would need to be
>> performed by Flink (not by the 3rd party system as suggested in the
>> original question) in the second phase of 2PC (when you're sure that you've
>> successfully taken a checkpoint, that has seen all the data).
>>
>> Agree, the deletion would have to be preceded by something like partition
>> drain. What is needed is the watermark reaching end of global window (+inf)
>> and a checkpoint. After that, the source can be removed and what happens
>> with it is no concern any more. That applies to all sources in general. I
>> don't know the implementation details, but it seems that the topic would
>> have to be somehow marked as "draining", it would then be the
>> responsibility of the reader to shift the watermark belonging to partitions
>> of that topic to +inf. It would then be responsibility of Flink to verify
>> that such source is removed only after a checkpoint is taken. Otherwise
>> there would be possible risk of data loss.
>>
>> This definitely looks like quite complex process.
>>
>>
>> Best,
>> D.
>>
>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> just out of curiosity, would this problem be solvable by the ability to
>>> remove partitions, that declare, that do not contain more data
>>> (watermark reaching end of global window)? There is probably another
>>> problem with that topic can be recreated after being deleted, which
>>> could result in watermark moving back in time, but this problem might be
>>> there already.
>>>
>>>   Jan
>>>
>>> On 9/14/21 3:08 PM, Fabian Paul wrote:
>>> > Hi Constantinos,
>>> >
>>> > I agree with David that it is not easily possible to remove a
>>> partition while a Flink job is running. Imagine the following scenario:
>>> >
>>> > Your Flink job initially works on 2 partitions belonging to two
>>> different topics and you have checkpointing enabled to guarantee
>>> > exactly-once delivery. It implies that on every checkpoint the offsets
>>> of the Kafka topic are stored in a Flink checkpoint to recover
>>> > from them in case of a failure.
>>> > Now you trigger the removal of one of the topics and the discovery
>>> detects that one of the partitions was removed. If the pipeline
>>> > now fails before the next checkpoint was taken Flink will try to
>>> recover from the previous checkpoint which is invalid by now because
>>> > the partition is not available anymore.
>>> >
>>> > Only 

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Hi all,

Thank you for the replies, they are much appreciated.

I'm sure I'm missing something obvious here, so bear with me...

Fabian, regarding:

"Flink will try to recover from the previous checkpoint which is invalid by
now because the partition is not available anymore."

The above would happen because the partition is not available anymore in
Kafka (right?), and not because Flink's partition discoverer has removed it
from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
in Kafka anymore, so that's the source of the problem in the scenario you
outlined). In other words, what would be the *extra* harm from Flink
cleaning up the partition from its cache after it knows that the partition
is gone - this is the part I still don't understand.

David, similarly:

"actual topic deletion would need to be performed by Flink (not by the 3rd
party system as suggested in the original question)"

The situation is that the topic has, for better or worse, already been
deleted. So my question is one of cleanup, i.e. how is it useful for Flink
to continue remembering the partition of an already-deleted topic? (the
checkpoint is invalid regardless, right?)



On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:

> On 9/14/21 3:57 PM, David Morávek wrote:
>
> Hi Jan,
>
> Notion of completeness is just one part of the problem. The second part is
> that once you remove the Kafka topic, you are no longer able to replay the
> data in case of failure.
>
> So you basically need a following workflow to ensure correctness:
>
> 1) Wait until there are no more elements in the topic (this can be done by
> checking watermark for that partition as you're suggesting)
> 2) Take a checkpoint N
> 3) Delete the topic (this effectively makes all the checkpoints < N
> invalid)
>
> Agree.
>
>
> If you switch order of 2) and 3) you have no way to recover from failure.
>
> Also for this to work properly, actual topic deletion would need to be
> performed by Flink (not by the 3rd party system as suggested in the
> original question) in the second phase of 2PC (when you're sure that you've
> successfully taken a checkpoint, that has seen all the data).
>
> Agree, the deletion would have to be preceded by something like partition
> drain. What is needed is the watermark reaching end of global window (+inf)
> and a checkpoint. After that, the source can be removed and what happens
> with it is no concern any more. That applies to all sources in general. I
> don't know the implementation details, but it seems that the topic would
> have to be somehow marked as "draining", it would then be the
> responsibility of the reader to shift the watermark belonging to partitions
> of that topic to +inf. It would then be responsibility of Flink to verify
> that such source is removed only after a checkpoint is taken. Otherwise
> there would be possible risk of data loss.
>
> This definitely looks like quite complex process.
>
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:
>
>> Hi,
>>
>> just out of curiosity, would this problem be solvable by the ability to
>> remove partitions, that declare, that do not contain more data
>> (watermark reaching end of global window)? There is probably another
>> problem with that topic can be recreated after being deleted, which
>> could result in watermark moving back in time, but this problem might be
>> there already.
>>
>>   Jan
>>
>> On 9/14/21 3:08 PM, Fabian Paul wrote:
>> > Hi Constantinos,
>> >
>> > I agree with David that it is not easily possible to remove a partition
>> while a Flink job is running. Imagine the following scenario:
>> >
>> > Your Flink job initially works on 2 partitions belonging to two
>> different topics and you have checkpointing enabled to guarantee
>> > exactly-once delivery. It implies that on every checkpoint the offsets
>> of the Kafka topic are stored in a Flink checkpoint to recover
>> > from them in case of a failure.
>> > Now you trigger the removal of one of the topics and the discovery
>> detects that one of the partitions was removed. If the pipeline
>> > now fails before the next checkpoint was taken Flink will try to
>> recover from the previous checkpoint which is invalid by now because
>> > the partition is not available anymore.
>> >
>> > Only if you do not care about loosing data it is possible to simply
>> ignore the removed partition.
>> >
>> > Best,
>> > Fabian
>>
>


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský

On 9/14/21 3:57 PM, David Morávek wrote:

Hi Jan,

Notion of completeness is just one part of the problem. The second 
part is that once you remove the Kafka topic, you are no longer able 
to replay the data in case of failure.


So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be 
done by checking watermark for that partition as you're suggesting)

2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N 
invalid)

Agree.


If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be 
performed by Flink (not by the 3rd party system as suggested in the 
original question) in the second phase of 2PC (when you're sure that 
you've successfully taken a checkpoint, that has seen all the data).


Agree, the deletion would have to be preceded by something like 
partition drain. What is needed is the watermark reaching end of global 
window (+inf) and a checkpoint. After that, the source can be removed 
and what happens with it is no concern any more. That applies to all 
sources in general. I don't know the implementation details, but it 
seems that the topic would have to be somehow marked as "draining", it 
would then be the responsibility of the reader to shift the watermark 
belonging to partitions of that topic to +inf. It would then be 
responsibility of Flink to verify that such source is removed only after 
a checkpoint is taken. Otherwise there would be possible risk of data loss.


This definitely looks like quite complex process.



Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský > wrote:


Hi,

just out of curiosity, would this problem be solvable by the
ability to
remove partitions, that declare, that do not contain more data
(watermark reaching end of global window)? There is probably another
problem with that topic can be recreated after being deleted, which
could result in watermark moving back in time, but this problem
might be
there already.

  Jan

On 9/14/21 3:08 PM, Fabian Paul wrote:
> Hi Constantinos,
>
> I agree with David that it is not easily possible to remove a
partition while a Flink job is running. Imagine the following
scenario:
>
> Your Flink job initially works on 2 partitions belonging to two
different topics and you have checkpointing enabled to guarantee
> exactly-once delivery. It implies that on every checkpoint the
offsets of the Kafka topic are stored in a Flink checkpoint to recover
> from them in case of a failure.
> Now you trigger the removal of one of the topics and the
discovery detects that one of the partitions was removed. If the
pipeline
> now fails before the next checkpoint was taken Flink will try to
recover from the previous checkpoint which is invalid by now because
> the partition is not available anymore.
>
> Only if you do not care about loosing data it is possible to
simply ignore the removed partition.
>
> Best,
> Fabian



Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
Hi Jan,

Notion of completeness is just one part of the problem. The second part is
that once you remove the Kafka topic, you are no longer able to replay the
data in case of failure.

So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be done by
checking watermark for that partition as you're suggesting)
2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N invalid)

If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be
performed by Flink (not by the 3rd party system as suggested in the
original question) in the second phase of 2PC (when you're sure that you've
successfully taken a checkpoint, that has seen all the data).

Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:

> Hi,
>
> just out of curiosity, would this problem be solvable by the ability to
> remove partitions, that declare, that do not contain more data
> (watermark reaching end of global window)? There is probably another
> problem with that topic can be recreated after being deleted, which
> could result in watermark moving back in time, but this problem might be
> there already.
>
>   Jan
>
> On 9/14/21 3:08 PM, Fabian Paul wrote:
> > Hi Constantinos,
> >
> > I agree with David that it is not easily possible to remove a partition
> while a Flink job is running. Imagine the following scenario:
> >
> > Your Flink job initially works on 2 partitions belonging to two
> different topics and you have checkpointing enabled to guarantee
> > exactly-once delivery. It implies that on every checkpoint the offsets
> of the Kafka topic are stored in a Flink checkpoint to recover
> > from them in case of a failure.
> > Now you trigger the removal of one of the topics and the discovery
> detects that one of the partitions was removed. If the pipeline
> > now fails before the next checkpoint was taken Flink will try to recover
> from the previous checkpoint which is invalid by now because
> > the partition is not available anymore.
> >
> > Only if you do not care about loosing data it is possible to simply
> ignore the removed partition.
> >
> > Best,
> > Fabian
>


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský

Hi,

just out of curiosity, would this problem be solvable by the ability to 
remove partitions, that declare, that do not contain more data 
(watermark reaching end of global window)? There is probably another 
problem with that topic can be recreated after being deleted, which 
could result in watermark moving back in time, but this problem might be 
there already.


 Jan

On 9/14/21 3:08 PM, Fabian Paul wrote:

Hi Constantinos,

I agree with David that it is not easily possible to remove a partition while a 
Flink job is running. Imagine the following scenario:

Your Flink job initially works on 2 partitions belonging to two different 
topics and you have checkpointing enabled to guarantee
exactly-once delivery. It implies that on every checkpoint the offsets of the 
Kafka topic are stored in a Flink checkpoint to recover
from them in case of a failure.
Now you trigger the removal of one of the topics and the discovery detects that 
one of the partitions was removed. If the pipeline
now fails before the next checkpoint was taken Flink will try to recover from 
the previous checkpoint which is invalid by now because
the partition is not available anymore.

Only if you do not care about loosing data it is possible to simply ignore the 
removed partition.

Best,
Fabian


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Fabian Paul
Hi Constantinos,

I agree with David that it is not easily possible to remove a partition while a 
Flink job is running. Imagine the following scenario:

Your Flink job initially works on 2 partitions belonging to two different 
topics and you have checkpointing enabled to guarantee
exactly-once delivery. It implies that on every checkpoint the offsets of the 
Kafka topic are stored in a Flink checkpoint to recover 
from them in case of a failure.
Now you trigger the removal of one of the topics and the discovery detects that 
one of the partitions was removed. If the pipeline
now fails before the next checkpoint was taken Flink will try to recover from 
the previous checkpoint which is invalid by now because
the partition is not available anymore. 

Only if you do not care about loosing data it is possible to simply ignore the 
removed partition.

Best,
Fabian

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Thank you for your answer David, which is a confirmation of what we see in
the Flink code.

A few thoughts below:


"as this may easily lead to a data loss"

Removing a topic/partition can indeed lead to data loss if not done
carefully. However, *after* the topic has been deleted, I believe it would
be safe for the partition discoverer to forget it, unless I am missing
something.


"partition removal is not even supported by Kafka"

Topic removal is supported, however, and that is our use case here as well.


"You should only be adding new topics / partitions"

Unfortunately, in some cloud environments, partitions are a precious
commodity and it is often unaffordable to scale them up without
subsequently scaling them down.


In my humble view, forgetting a topic's partitions after the topic's
removal should be supported by the partition discoverer (even if it's an
opt-in capability). Would the Flink community be open to a contribution
that does this?


Best regards,

Constantinos Papadopoulos

On Tue, Sep 14, 2021 at 12:54 PM David Morávek  wrote:

> Hi Constantinos,
>
> The partition discovery doesn't support topic / partition removal as this
> may easily lead to a data loss (partition removal is not even supported by
> Kafka for the same reason)
>
> Dynamically adding and removing partitions as part of a business logic is
> just not how Kafka is designed to work. You should only be adding new
> topics / partitions for scale out reasons and even that should be done
> super carefully because it breaks data partitioning.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
>> name *pattern*, and we have partition discovery enabled.
>>
>> When our product scales up, it adds new topics. When it scales down, it
>> removes topics.
>>
>> The problem is that the FlinkKafkaConsumer never seems to forget
>> partitions that don't exist anymore.
>>
>> As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:
>>
>> *[Consumer clientId=consumer-metric-processor-consumer-group-2,
>> groupId=metric-processor-consumer-group] Error while fetching metadata with
>> correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*
>>
>> Over time, the problem becomes worse as scale ups and scale downs
>> continue happening (and thus the number of deleted partitions continues
>> increasing).
>>
>> Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
>> forget partitions that don't exist anymore?
>>
>> The deleted topics are not returned by the "listTopics" API which the
>> KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
>> KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
>> and their partitions.
>>
>


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
Hi Constantinos,

The partition discovery doesn't support topic / partition removal as this
may easily lead to a data loss (partition removal is not even supported by
Kafka for the same reason)

Dynamically adding and removing partitions as part of a business logic is
just not how Kafka is designed to work. You should only be adding new
topics / partitions for scale out reasons and even that should be done
super carefully because it breaks data partitioning.

Best,
D.

On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos 
wrote:

> We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
> name *pattern*, and we have partition discovery enabled.
>
> When our product scales up, it adds new topics. When it scales down, it
> removes topics.
>
> The problem is that the FlinkKafkaConsumer never seems to forget
> partitions that don't exist anymore.
>
> As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:
>
> *[Consumer clientId=consumer-metric-processor-consumer-group-2,
> groupId=metric-processor-consumer-group] Error while fetching metadata with
> correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*
>
> Over time, the problem becomes worse as scale ups and scale downs continue
> happening (and thus the number of deleted partitions continues increasing).
>
> Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
> forget partitions that don't exist anymore?
>
> The deleted topics are not returned by the "listTopics" API which the
> KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
> KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
> and their partitions.
>


Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
name *pattern*, and we have partition discovery enabled.

When our product scales up, it adds new topics. When it scales down, it
removes topics.

The problem is that the FlinkKafkaConsumer never seems to forget partitions
that don't exist anymore.

As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:

*[Consumer clientId=consumer-metric-processor-consumer-group-2,
groupId=metric-processor-consumer-group] Error while fetching metadata with
correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*

Over time, the problem becomes worse as scale ups and scale downs continue
happening (and thus the number of deleted partitions continues increasing).

Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
forget partitions that don't exist anymore?

The deleted topics are not returned by the "listTopics" API which the
KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
and their partitions.