Thanks David. What you are saying makes sense. But, I keep hearing I shouldn't delete the topic externally, and I keep asking why doesn't Flink forget about the topic IF it has in fact been deleted externally (for whatever reason).
I think I will drop this now. On Tue, Sep 14, 2021 at 5:50 PM David Morávek <d...@apache.org> wrote: > We are basically describing the same thing with Fabian, just a different > wording. > > The problem is that if you delete the topic externally, you're making an > assumption that downstream processor (Flink in this case) has already > consumed and RELIABLY processed all of the data from that topic (which may > not be true). This would effectively lead to AT_MOST_ONCE delivery > guarantees (in other words, we are OK with loosing data), which is a > trade-off that _in_my_opinion_ we shouldn't make here. > > Best, > D. > > On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos < > cpa...@gmail.com> wrote: > >> Hi all, >> >> Thank you for the replies, they are much appreciated. >> >> I'm sure I'm missing something obvious here, so bear with me... >> >> Fabian, regarding: >> >> "Flink will try to recover from the previous checkpoint which is invalid >> by now because the partition is not available anymore." >> >> The above would happen because the partition is not available anymore in >> Kafka (right?), and not because Flink's partition discoverer has removed it >> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist >> in Kafka anymore, so that's the source of the problem in the scenario you >> outlined). In other words, what would be the *extra* harm from Flink >> cleaning up the partition from its cache after it knows that the partition >> is gone - this is the part I still don't understand. >> >> David, similarly: >> >> "actual topic deletion would need to be performed by Flink (not by the >> 3rd party system as suggested in the original question)" >> >> The situation is that the topic has, for better or worse, already been >> deleted. So my question is one of cleanup, i.e. how is it useful for Flink >> to continue remembering the partition of an already-deleted topic? (the >> checkpoint is invalid regardless, right?) >> >> >> >> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> On 9/14/21 3:57 PM, David Morávek wrote: >>> >>> Hi Jan, >>> >>> Notion of completeness is just one part of the problem. The second part >>> is that once you remove the Kafka topic, you are no longer able to replay >>> the data in case of failure. >>> >>> So you basically need a following workflow to ensure correctness: >>> >>> 1) Wait until there are no more elements in the topic (this can be done >>> by checking watermark for that partition as you're suggesting) >>> 2) Take a checkpoint N >>> 3) Delete the topic (this effectively makes all the checkpoints < N >>> invalid) >>> >>> Agree. >>> >>> >>> If you switch order of 2) and 3) you have no way to recover from failure. >>> >>> Also for this to work properly, actual topic deletion would need to be >>> performed by Flink (not by the 3rd party system as suggested in the >>> original question) in the second phase of 2PC (when you're sure that you've >>> successfully taken a checkpoint, that has seen all the data). >>> >>> Agree, the deletion would have to be preceded by something like >>> partition drain. What is needed is the watermark reaching end of global >>> window (+inf) and a checkpoint. After that, the source can be removed and >>> what happens with it is no concern any more. That applies to all sources in >>> general. I don't know the implementation details, but it seems that the >>> topic would have to be somehow marked as "draining", it would then be the >>> responsibility of the reader to shift the watermark belonging to partitions >>> of that topic to +inf. It would then be responsibility of Flink to verify >>> that such source is removed only after a checkpoint is taken. Otherwise >>> there would be possible risk of data loss. >>> >>> This definitely looks like quite complex process. >>> >>> >>> Best, >>> D. >>> >>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi, >>>> >>>> just out of curiosity, would this problem be solvable by the ability to >>>> remove partitions, that declare, that do not contain more data >>>> (watermark reaching end of global window)? There is probably another >>>> problem with that topic can be recreated after being deleted, which >>>> could result in watermark moving back in time, but this problem might >>>> be >>>> there already. >>>> >>>> Jan >>>> >>>> On 9/14/21 3:08 PM, Fabian Paul wrote: >>>> > Hi Constantinos, >>>> > >>>> > I agree with David that it is not easily possible to remove a >>>> partition while a Flink job is running. Imagine the following scenario: >>>> > >>>> > Your Flink job initially works on 2 partitions belonging to two >>>> different topics and you have checkpointing enabled to guarantee >>>> > exactly-once delivery. It implies that on every checkpoint the >>>> offsets of the Kafka topic are stored in a Flink checkpoint to recover >>>> > from them in case of a failure. >>>> > Now you trigger the removal of one of the topics and the discovery >>>> detects that one of the partitions was removed. If the pipeline >>>> > now fails before the next checkpoint was taken Flink will try to >>>> recover from the previous checkpoint which is invalid by now because >>>> > the partition is not available anymore. >>>> > >>>> > Only if you do not care about loosing data it is possible to simply >>>> ignore the removed partition. >>>> > >>>> > Best, >>>> > Fabian >>>> >>>