Hi Jan,

Notion of completeness is just one part of the problem. The second part is
that once you remove the Kafka topic, you are no longer able to replay the
data in case of failure.

So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be done by
checking watermark for that partition as you're suggesting)
2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N invalid)

If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be
performed by Flink (not by the 3rd party system as suggested in the
original question) in the second phase of 2PC (when you're sure that you've
successfully taken a checkpoint, that has seen all the data).

Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> just out of curiosity, would this problem be solvable by the ability to
> remove partitions, that declare, that do not contain more data
> (watermark reaching end of global window)? There is probably another
> problem with that topic can be recreated after being deleted, which
> could result in watermark moving back in time, but this problem might be
> there already.
>
>   Jan
>
> On 9/14/21 3:08 PM, Fabian Paul wrote:
> > Hi Constantinos,
> >
> > I agree with David that it is not easily possible to remove a partition
> while a Flink job is running. Imagine the following scenario:
> >
> > Your Flink job initially works on 2 partitions belonging to two
> different topics and you have checkpointing enabled to guarantee
> > exactly-once delivery. It implies that on every checkpoint the offsets
> of the Kafka topic are stored in a Flink checkpoint to recover
> > from them in case of a failure.
> > Now you trigger the removal of one of the topics and the discovery
> detects that one of the partitions was removed. If the pipeline
> > now fails before the next checkpoint was taken Flink will try to recover
> from the previous checkpoint which is invalid by now because
> > the partition is not available anymore.
> >
> > Only if you do not care about loosing data it is possible to simply
> ignore the removed partition.
> >
> > Best,
> > Fabian
>

Reply via email to