Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of
commit offsets to Kafka for restarting.You can use a kafka client configuration
to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
("enable.auto.commit") = trueHope this helps.
Best, JingsongLee
------------------------------------------------------------------From:Gwilym
Evans <gwilym.ev...@bigcommerce.com>Time:2017 Jun 29 (Thu) 15:32To:user
<user@beam.apache.org>Subject:Kafka offset management
Hi list,
I was playing around with KafkaIO today to understand how it behaves in a
failure or restart scenario (be it crash, cancel, or drain), and I found it
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from the
latest offsets rather than the last successfully processed offsets, and a gap
in messages was observed as a result.
My KafkaIO transform looks like:
KafkaIO.<Long, String>readBytes() .withConsumerFactoryFn(new
KafkaConsumerFactoryFn())
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
.withTopics(ImmutableList.of(KAFKA_TOPIC))
The consumer factory is used because of some runtime SSL key/truststore setup:
final Map<String, Object> config = Maps.newHashMap();
config.put("auto.offset.reset", "latest");
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
config.put("group.id", KAFKA_GROUP_ID);
config.put("key.deserializer", ByteArrayDeserializer.class.getName());
config.put("security.protocol", "SSL");
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
config.put("ssl.truststore.location", truststore.toPath().toString());
config.put("ssl.truststore.password", kafkaTruststorePassword);
config.put("value.deserializer", ByteArrayDeserializer.class.getName());
return new KafkaConsumer<>(config);
So, I am setting a group.id, and I know KafkaIO is using the new consumer
because our Zookeeper is not accessible from Dataflow.
When I look on the Kafka cluster, there is no record of the consumer group's
offsets. So I take it KafkaIO is not committing offsets to Kafka.
Can this be changed to commit offsets when a "batch" of streaming messages are
seen as processed OK? I am fine with at-least-once.
I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
Cheers,Gwilym