Hi list, I was playing around with KafkaIO today to understand how it behaves in a failure or restart scenario (be it crash, cancel, or drain), and I found it "lost" (or skipped) Kafka messages in these cases. That is, it resumed from the latest offsets rather than the last successfully processed offsets, and a gap in messages was observed as a result.
My KafkaIO transform looks like: KafkaIO.<Long, String>readBytes() .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) .withTopics(ImmutableList.of(KAFKA_TOPIC)) The consumer factory is used because of some runtime SSL key/truststore setup: final Map<String, Object> config = Maps.newHashMap(); config.put("auto.offset.reset", "latest"); config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); config.put("group.id", KAFKA_GROUP_ID); config.put("key.deserializer", ByteArrayDeserializer.class.getName()); config.put("security.protocol", "SSL"); config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); config.put("ssl.truststore.location", truststore.toPath().toString()); config.put("ssl.truststore.password", kafkaTruststorePassword); config.put("value.deserializer", ByteArrayDeserializer.class.getName()); return new KafkaConsumer<>(config); So, I am setting a group.id, and I know KafkaIO is using the new consumer because our Zookeeper is not accessible from Dataflow. When I look on the Kafka cluster, there is no record of the consumer group's offsets. So I take it KafkaIO is not committing offsets to Kafka. Can this be changed to commit offsets when a "batch" of streaming messages are seen as processed OK? I am fine with at-least-once. I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 Cheers, Gwilym