Hi all,

I have written the below sample app below to test out the exactly-once
semantics of apache beam with Flink runner for KafkaIO.

pipeline
        .apply("read from Kafka, KafkaIO
                .<String, String>read()
                .withTopic("consumer-topic")
                .withConsumerConfigUpdates(consumerConfigs)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .commitOffsetsInFinalize())
        .apply("transform", ParDo.of(new MessageTransformer()))
        .apply("write to kafka", KafkaIO
                .<String, String>write()
                .withProducerConfigUpdates(producerConfigs)
                .withTopic("producer-topic")
                .withKeySerializer(StringSerializer.class)
                .withValueSerializer(StringSerializer.class)
                .withEOS(2, "publisher"));

I have noticed that once the withEOS is added, it no longer publishes any
messages into the Kafka topic and fails with the following error.

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
60000ms expired before the last committed offset for partitions

Looks like the producer is no longer publishing any messages. Tried reading
from the topic using isolation read_uncomitted still there are no messages
published on the topic.
For flinkOptions I have set the checkpoint interval to 10s and
checkpointing mode to exactly-once.

Any thoughts on why this is happening?

Thank you.

Reply via email to