Hi JingsongLee, Thanks for the reply.
What I'm trying to avoid are lost / skipped messages due to two situations: 1. Lost offsets, or 2. Premature offset commits I've researched snapshot checkpoints, and from what I understand these are only maintained in Dataflow when a job is updated. If a job crashes, is cancelled, or is drained, then the checkpoints are lost. This is situation (1) above. >From what I understand about auto-commit offsets, it's where the Kafka client periodically commits offsets it has polled automatically. In the case of Beam and Dataflow, this would be even if the offsets it is committing has not yet been fully processed by the pipeline. This is situation (2) above. So far I'm not seeing a way to avoid data loss besides resetting to the earliest offset when a job starts. But, given we retain data in our Kafka topics for up to 7 days, that is not feasible from a performance point of view. Can anyone confirm / deny my understanding here? Cheers, Gwilym On 30 June 2017 at 02:59, JingsongLee <[email protected]> wrote: > Hi Gwilym > KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit > offsets to > Kafka for restarting. > You can use a kafka client configuration to open auto commit of offsets. > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true > > Hope this helps. > > Best, JingsongLee > > ------------------------------------------------------------------ > From:Gwilym Evans <[email protected]> > Time:2017 Jun 29 (Thu) 15:32 > To:user <[email protected]> > Subject:Kafka offset management > > Hi list, > > I was playing around with KafkaIO today to understand how it behaves in a > failure or restart scenario (be it crash, cancel, or drain), and I found it > "lost" (or skipped) Kafka messages in these cases. That is, it resumed from > the latest offsets rather than the last successfully processed offsets, and > a gap in messages was observed as a result. > > My KafkaIO transform looks like: > > KafkaIO.<Long, String>readBytes() > .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) > .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) > .withTopics(ImmutableList.of(KAFKA_TOPIC)) > > The consumer factory is used because of some runtime SSL key/truststore > setup: > > final Map<String, Object> config = Maps.newHashMap(); > config.put("auto.offset.reset", "latest"); > config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); > config.put("group.id", KAFKA_GROUP_ID); > config.put("key.deserializer", ByteArrayDeserializer.class. > getName()); > config.put("security.protocol", "SSL"); > config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); > config.put("ssl.truststore.location", > truststore.toPath().toString()); > config.put("ssl.truststore.password", > kafkaTruststorePassword); > config.put("value.deserializer", ByteArrayDeserializer.class. > getName()); > > return new KafkaConsumer<>(config); > > So, I am setting a group.id, and I know KafkaIO is using the new consumer > because our Zookeeper is not accessible from Dataflow. > > When I look on the Kafka cluster, there is no record of the consumer > group's offsets. So I take it KafkaIO is not committing offsets to Kafka. > > Can this be changed to commit offsets when a "batch" of streaming messages > are seen as processed OK? I am fine with at-least-once. > > I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 > > Cheers, > Gwilym > > >
