Hello Christian, Thanks for posting here the detailed scenario of your experiments. I think it may be important to share your KafkaIO configuration here too. For example, are you setting this config anyhow? https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <[email protected]> wrote: > Hi everyone, > > I'm trying to figure out how pipeline state works with Beam running on > Flink Classic. Would appreciate some help with the below. > > My understanding is that on recovery (whether from a checkpoint or > savepoint), Flink recreates the operators (I guess those are DoFns in Beam) > with whatever state they had when the pipeline crashed. For example the > Kafka operator might contain the latest *safe* offset to restart from. But > I'm not seeing this when I introduce exceptions in the pipeline. > > My pipeline is as follows: > 1. Read a Kafka topic from start > 2. Have a DoFn that stores all incoming messages in a BagState > 3. Above DoFn triggers a timer set in such a way that it triggers after > there are a few checkpoints created and kept because of > --externalizeCheckpointsEnabled = true. This timer handler then outputs the > elements to the next operator, in this case KafkaIo.Write. > 4. Before the timer in #3 is executed manually trigger an exception > (listen to another kafka topic, and throw any time a new message comes in) > > What I observe: > 1. In #4 above Flink tries to process the exception twice then stops the > pipeline (because numberOfExecutionRetries =2 ) > 2. After the pipeline is stopped, I see the checkpoints are kept in the > configured directory > 3. If I restart the pipeline (with --savepointPath = <path to latest > checkpoint from first run>): > 3a. No messages are read from kafka, because the Kafka reader reached the > end of the topic during the first run > 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn > isn't even started > 3c. The timer in #3 is never executed, hence there is data loss as the > elements I had in my DoFn state are never processed > 4. If I manually reset the offset to the start of the topic and restart > the pipeline (with --savepointPath = <path to latest checkpoint from first > run>): > 4a. StartBundle methods are called > 4b. In ProcessElement, the BagState is empty on the first received > message. If I'm restoring from a checkpoint/savepoint, I would expect this > state to be filled. > > Is this correct behaviour? Am I doing something wrong? > > Thanks, > Cristian > > Other quirks I found: > a. If KafkaIO.Read is configured to read from the latest offset, and there > is an exception thrown in the pipeline before the first checkpoint happens > (let's say on the first message that comes in), when Flink retries KafkaIO > reads from the latest offset again. That means that the message that caused > the exception is not reprocessed. On the other hand, if the exception is > thrown after the first checkpoint, that message will be tried twice > (because numberOfExecutionRetries =2 ), and then the pipeline will exit. I > think this is working as designed but it feels a little weird that the > behaviour is different depending if there's a checkpoint or not. > > b. When KafkaIO.Write is configured with .withEOS(number, "group"), and > there is an exception thrown in the pipeline, the Flink job doesn't exit. I > think there is a kafka producer in KafkaExactlyOnceSink that is not closed > correctly. >
