Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of 
commit offsets to Kafka for restarting.You can use a kafka client configuration 
to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 
("enable.auto.commit") = trueHope this helps.
Best, JingsongLee
------------------------------------------------------------------From:Gwilym 
Evans <gwilym.ev...@bigcommerce.com>Time:2017 Jun 29 (Thu) 15:32To:user 
<user@beam.apache.org>Subject:Kafka offset management
Hi list,
I was playing around with KafkaIO today to understand how it behaves in a 
failure or restart scenario (be it crash, cancel, or drain), and I found it 
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from the 
latest offsets rather than the last successfully processed offsets, and a gap 
in messages was observed as a result.
My KafkaIO transform looks like:
KafkaIO.<Long, String>readBytes()                .withConsumerFactoryFn(new 
KafkaConsumerFactoryFn())                
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)                
.withTopics(ImmutableList.of(KAFKA_TOPIC))
The consumer factory is used because of some runtime SSL key/truststore setup:
            final Map<String, Object> config = Maps.newHashMap();            
config.put("auto.offset.reset", "latest");            
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);            
config.put("group.id", KAFKA_GROUP_ID);            
config.put("key.deserializer", ByteArrayDeserializer.class.getName());          
  config.put("security.protocol", "SSL");            
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");            
config.put("ssl.truststore.location", truststore.toPath().toString());          
  config.put("ssl.truststore.password", kafkaTruststorePassword);             
config.put("value.deserializer", ByteArrayDeserializer.class.getName());
            return new KafkaConsumer<>(config);
So, I am setting a group.id, and I know KafkaIO is using the new consumer 
because our Zookeeper is not accessible from Dataflow.
When I look on the Kafka cluster, there is no record of the consumer group's 
offsets. So I take it KafkaIO is not committing offsets to Kafka.
Can this be changed to commit offsets when a "batch" of streaming messages are 
seen as processed OK? I am fine with at-least-once.
I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
Cheers,Gwilym

Reply via email to