Thanks David. What you are saying makes sense. But, I keep hearing I
shouldn't delete the topic externally, and I keep asking why doesn't Flink
forget about the topic IF it has in fact been deleted externally (for
whatever reason).

I think I will drop this now.

On Tue, Sep 14, 2021 at 5:50 PM David Morávek <d...@apache.org> wrote:

> We are basically describing the same thing with Fabian, just a different
> wording.
>
> The problem is that if you delete the topic externally, you're making an
> assumption that downstream processor (Flink in this case) has already
> consumed and RELIABLY processed all of the data from that topic (which may
> not be true). This would effectively lead to AT_MOST_ONCE delivery
> guarantees (in other words, we are OK with loosing data), which is a
> trade-off that _in_my_opinion_ we shouldn't make here.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thank you for the replies, they are much appreciated.
>>
>> I'm sure I'm missing something obvious here, so bear with me...
>>
>> Fabian, regarding:
>>
>> "Flink will try to recover from the previous checkpoint which is invalid
>> by now because the partition is not available anymore."
>>
>> The above would happen because the partition is not available anymore in
>> Kafka (right?), and not because Flink's partition discoverer has removed it
>> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
>> in Kafka anymore, so that's the source of the problem in the scenario you
>> outlined). In other words, what would be the *extra* harm from Flink
>> cleaning up the partition from its cache after it knows that the partition
>> is gone - this is the part I still don't understand.
>>
>> David, similarly:
>>
>> "actual topic deletion would need to be performed by Flink (not by the
>> 3rd party system as suggested in the original question)"
>>
>> The situation is that the topic has, for better or worse, already been
>> deleted. So my question is one of cleanup, i.e. how is it useful for Flink
>> to continue remembering the partition of an already-deleted topic? (the
>> checkpoint is invalid regardless, right?)
>>
>>
>>
>> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> On 9/14/21 3:57 PM, David Morávek wrote:
>>>
>>> Hi Jan,
>>>
>>> Notion of completeness is just one part of the problem. The second part
>>> is that once you remove the Kafka topic, you are no longer able to replay
>>> the data in case of failure.
>>>
>>> So you basically need a following workflow to ensure correctness:
>>>
>>> 1) Wait until there are no more elements in the topic (this can be done
>>> by checking watermark for that partition as you're suggesting)
>>> 2) Take a checkpoint N
>>> 3) Delete the topic (this effectively makes all the checkpoints < N
>>> invalid)
>>>
>>> Agree.
>>>
>>>
>>> If you switch order of 2) and 3) you have no way to recover from failure.
>>>
>>> Also for this to work properly, actual topic deletion would need to be
>>> performed by Flink (not by the 3rd party system as suggested in the
>>> original question) in the second phase of 2PC (when you're sure that you've
>>> successfully taken a checkpoint, that has seen all the data).
>>>
>>> Agree, the deletion would have to be preceded by something like
>>> partition drain. What is needed is the watermark reaching end of global
>>> window (+inf) and a checkpoint. After that, the source can be removed and
>>> what happens with it is no concern any more. That applies to all sources in
>>> general. I don't know the implementation details, but it seems that the
>>> topic would have to be somehow marked as "draining", it would then be the
>>> responsibility of the reader to shift the watermark belonging to partitions
>>> of that topic to +inf. It would then be responsibility of Flink to verify
>>> that such source is removed only after a checkpoint is taken. Otherwise
>>> there would be possible risk of data loss.
>>>
>>> This definitely looks like quite complex process.
>>>
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> just out of curiosity, would this problem be solvable by the ability to
>>>> remove partitions, that declare, that do not contain more data
>>>> (watermark reaching end of global window)? There is probably another
>>>> problem with that topic can be recreated after being deleted, which
>>>> could result in watermark moving back in time, but this problem might
>>>> be
>>>> there already.
>>>>
>>>>   Jan
>>>>
>>>> On 9/14/21 3:08 PM, Fabian Paul wrote:
>>>> > Hi Constantinos,
>>>> >
>>>> > I agree with David that it is not easily possible to remove a
>>>> partition while a Flink job is running. Imagine the following scenario:
>>>> >
>>>> > Your Flink job initially works on 2 partitions belonging to two
>>>> different topics and you have checkpointing enabled to guarantee
>>>> > exactly-once delivery. It implies that on every checkpoint the
>>>> offsets of the Kafka topic are stored in a Flink checkpoint to recover
>>>> > from them in case of a failure.
>>>> > Now you trigger the removal of one of the topics and the discovery
>>>> detects that one of the partitions was removed. If the pipeline
>>>> > now fails before the next checkpoint was taken Flink will try to
>>>> recover from the previous checkpoint which is invalid by now because
>>>> > the partition is not available anymore.
>>>> >
>>>> > Only if you do not care about loosing data it is possible to simply
>>>> ignore the removed partition.
>>>> >
>>>> > Best,
>>>> > Fabian
>>>>
>>>

Reply via email to