Hey Pavel,

Thanks for the quick reply. Pardon me as I cannot copy/paste straight from
the IDE, copying by hand:

KafkaIO.<Pojo>read()
.withBootStrapServer("address")
.withTopic("topic")

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))

.withConsumerConfigUpdates(map)
.withReadCommitted()
.commitOffsetInFinalize()

.withProcessingTime();


The config map is:
enable.auto.commit -> false
group.id -> some group
auto.offset.reset -> earliest
specific.avro.reader -> false


On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <[email protected]> wrote:

> Hello Christian,
>
> Thanks for posting here the detailed scenario of your experiments. I think
> it may be important to share your KafkaIO configuration here too. For
> example, are you setting this config anyhow?
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <[email protected]>
> wrote:
>
>> Hi everyone,
>>
>> I'm trying to figure out how pipeline state works with Beam running on
>> Flink Classic. Would appreciate some help with the below.
>>
>> My understanding is that on recovery (whether from a checkpoint or
>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>> with whatever state they had when the pipeline crashed. For example the
>> Kafka operator might contain the latest *safe* offset to restart from. But
>> I'm not seeing this when I introduce exceptions in the pipeline.
>>
>> My pipeline is as follows:
>> 1. Read a Kafka topic from start
>> 2. Have a DoFn that stores all incoming messages in a BagState
>> 3. Above DoFn triggers a timer set in such a way that it triggers after
>> there are a few checkpoints created and kept because of
>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>> elements to the next operator, in this case KafkaIo.Write.
>> 4. Before the timer in #3 is executed manually trigger an exception
>> (listen to another kafka topic, and throw any time a new message comes in)
>>
>> What I observe:
>> 1. In #4 above Flink tries to process the exception twice then stops the
>> pipeline (because numberOfExecutionRetries =2 )
>> 2. After the pipeline is stopped, I see the checkpoints are kept in the
>> configured directory
>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>> checkpoint from first run>):
>> 3a. No messages are read from kafka, because the Kafka reader reached the
>> end of the topic during the first run
>> 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
>> isn't even started
>> 3c. The timer in #3 is never executed, hence there is data loss as the
>> elements I had in my DoFn state are never processed
>> 4. If I manually reset the offset to the start of the topic and restart
>> the pipeline (with --savepointPath = <path to latest checkpoint from first
>> run>):
>> 4a. StartBundle methods are called
>> 4b. In ProcessElement, the BagState is empty on the first received
>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>> state to be filled.
>>
>> Is this correct behaviour? Am I doing something wrong?
>>
>> Thanks,
>> Cristian
>>
>> Other quirks I found:
>> a. If KafkaIO.Read is configured to read from the latest offset, and
>> there is an exception thrown in the pipeline before the first checkpoint
>> happens (let's say on the first message that comes in), when Flink retries
>> KafkaIO reads from the latest offset again. That means that the message
>> that caused the exception is not reprocessed. On the other hand, if the
>> exception is thrown after the first checkpoint, that message will be tried
>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>> exit. I think this is working as designed but it feels a little weird
>> that the behaviour is different depending if there's a checkpoint or not.
>>
>> b. When KafkaIO.Write is configured with .withEOS(number, "group"), and
>> there is an exception thrown in the pipeline, the Flink job doesn't exit. I
>> think there is a kafka producer in KafkaExactlyOnceSink that is not closed
>> correctly.
>>
>

Reply via email to