Hi all,
I have written the below sample app below to test out the exactly-once
semantics of apache beam with Flink runner for KafkaIO.
pipeline
.apply("read from Kafka, KafkaIO
.<String, String>read()
.withTopic("consumer-topic")
.withConsumerConfigUpdates(consumerConfigs)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.commitOffsetsInFinalize())
.apply("transform", ParDo.of(new MessageTransformer()))
.apply("write to kafka", KafkaIO
.<String, String>write()
.withProducerConfigUpdates(producerConfigs)
.withTopic("producer-topic")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
.withEOS(2, "publisher"));
I have noticed that once the withEOS is added, it no longer publishes any
messages into the Kafka topic and fails with the following error.
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
60000ms expired before the last committed offset for partitions
Looks like the producer is no longer publishing any messages. Tried reading
from the topic using isolation read_uncomitted still there are no messages
published on the topic.
For flinkOptions I have set the checkpoint interval to 10s and
checkpointing mode to exactly-once.
Any thoughts on why this is happening?
Thank you.