Hi list,

I was playing around with KafkaIO today to understand how it behaves in a
failure or restart scenario (be it crash, cancel, or drain), and I found it
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from
the latest offsets rather than the last successfully processed offsets, and
a gap in messages was observed as a result.

My KafkaIO transform looks like:

KafkaIO.<Long, String>readBytes()
                .withConsumerFactoryFn(new KafkaConsumerFactoryFn())
                .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
                .withTopics(ImmutableList.of(KAFKA_TOPIC))

The consumer factory is used because of some runtime SSL key/truststore
setup:

            final Map<String, Object> config = Maps.newHashMap();
            config.put("auto.offset.reset", "latest");
            config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
            config.put("group.id", KAFKA_GROUP_ID);
            config.put("key.deserializer",
ByteArrayDeserializer.class.getName());
            config.put("security.protocol", "SSL");
            config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
            config.put("ssl.truststore.location",
truststore.toPath().toString());
            config.put("ssl.truststore.password", kafkaTruststorePassword);
            config.put("value.deserializer",
ByteArrayDeserializer.class.getName());

            return new KafkaConsumer<>(config);

So, I am setting a group.id, and I know KafkaIO is using the new consumer
because our Zookeeper is not accessible from Dataflow.

When I look on the Kafka cluster, there is no record of the consumer
group's offsets. So I take it KafkaIO is not committing offsets to Kafka.

Can this be changed to commit offsets when a "batch" of streaming messages
are seen as processed OK? I am fine with at-least-once.

I am using Dataflow, Beam 2.0, Kafka 0.10.2.1

Cheers,
Gwilym

Reply via email to