Hi Beam team,
We're seeing Apache Beam have checkpoint timeouts on Flink. They happen
when the pipeline has a slow step and we send a bunch of messages on Kafka.
I have set up a similar pipeline on my laptop that reproduces the problem.
Pipeline details:
-----------------
* Python, running a Beam pipeline on Flink via PortableRunner
* Streaming
* Read from Kafka
* A slow beam.Map() call
* WindowInto, Write to files
The pipeline calls beam.Map() on a function with sleep(20 seconds).
Our pipeline configuration:
* Checkpoint interval = 30s
* Checkpoint timeout = 60s
* Fail on checkpointing errors = false
* Max bundle size = 2
* Parallelism = 1
Steps to reproduce:
-------------------
After the pipeline is running, we send 10 Kafka messages from a file as
follows:
seq 1 10 > msgs.txt
kcat -b localhost:9092 -P -k "testMessage" -t echo-input -D '\n' -l
< msgs.txt
What we expected to see:
------------------------
Because we set bundle size = 2, we were expecting Beam to pick up 2 Kafka
records at a time.
Because this would only take 40 seconds, we would be within checkpoint
timeout, so we were expecting all checkpoints to succeed.
What we see instead:
--------------------
When we send the messages, we see any checkpoints that start in the next
200 seconds to fail with timeouts (20 x 10 messages). Then they start
working again.
Based on Flink TaskManager logs, it seems that Kafka consumer has read up
to latest offset (backlog used to be 0, but became 96 bytes):
2022-04-27 13:12:32,359 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0:
backlog 96
This backlog goes down ultimately to 0 after 200 seconds. Then, the task
attempts to send an acknowledgment, but by then it's too late.
2022-04-27 13:15:53,300 WARN
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl
[] - Time from receiving all checkpoint barriers/RPC to executing it
exceeded threshold: 198275ms
By this time, Flink JobManager would have already marked that checkpoint as
failed and started a new one.
Questions on Beam:
------------------
We don't fully understand this Beam/Flink behavior, but with regards to
Beam we wanted to ask:
* Why does Beam seem to process all the Kafka records, even when we have
set max bundle size = 2?
* Alternatively, is there any way to limit how many records are read by
beam.io.kafka.ReadFromKafka()?
I'm happy to share additional logs or any other details I missed here.
Many thanks,
Deepak