Hello Christian,

Thanks for posting here the detailed scenario of your experiments. I think
it may be important to share your KafkaIO configuration here too. For
example, are you setting this config anyhow?
https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <[email protected]>
wrote:

> Hi everyone,
>
> I'm trying to figure out how pipeline state works with Beam running on
> Flink Classic. Would appreciate some help with the below.
>
> My understanding is that on recovery (whether from a checkpoint or
> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
> with whatever state they had when the pipeline crashed. For example the
> Kafka operator might contain the latest *safe* offset to restart from. But
> I'm not seeing this when I introduce exceptions in the pipeline.
>
> My pipeline is as follows:
> 1. Read a Kafka topic from start
> 2. Have a DoFn that stores all incoming messages in a BagState
> 3. Above DoFn triggers a timer set in such a way that it triggers after
> there are a few checkpoints created and kept because of
> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
> elements to the next operator, in this case KafkaIo.Write.
> 4. Before the timer in #3 is executed manually trigger an exception
> (listen to another kafka topic, and throw any time a new message comes in)
>
> What I observe:
> 1. In #4 above Flink tries to process the exception twice then stops the
> pipeline (because numberOfExecutionRetries =2 )
> 2. After the pipeline is stopped, I see the checkpoints are kept in the
> configured directory
> 3. If I restart the pipeline (with --savepointPath = <path to latest
> checkpoint from first run>):
> 3a. No messages are read from kafka, because the Kafka reader reached the
> end of the topic during the first run
> 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
> isn't even started
> 3c. The timer in #3 is never executed, hence there is data loss as the
> elements I had in my DoFn state are never processed
> 4. If I manually reset the offset to the start of the topic and restart
> the pipeline (with --savepointPath = <path to latest checkpoint from first
> run>):
> 4a. StartBundle methods are called
> 4b. In ProcessElement, the BagState is empty on the first received
> message. If I'm restoring from a checkpoint/savepoint, I would expect this
> state to be filled.
>
> Is this correct behaviour? Am I doing something wrong?
>
> Thanks,
> Cristian
>
> Other quirks I found:
> a. If KafkaIO.Read is configured to read from the latest offset, and there
> is an exception thrown in the pipeline before the first checkpoint happens
> (let's say on the first message that comes in), when Flink retries KafkaIO
> reads from the latest offset again. That means that the message that caused
> the exception is not reprocessed. On the other hand, if the exception is
> thrown after the first checkpoint, that message will be tried twice
> (because numberOfExecutionRetries =2 ), and then the pipeline will exit. I
> think this is working as designed but it feels a little weird that the
> behaviour is different depending if there's a checkpoint or not.
>
> b. When KafkaIO.Write is configured with .withEOS(number, "group"), and
> there is an exception thrown in the pipeline, the Flink job doesn't exit. I
> think there is a kafka producer in KafkaExactlyOnceSink that is not closed
> correctly.
>

Reply via email to