Sorry, forget user mail group. + @user Yeah, although KafkaIO (exposed offsets interface) is different from PubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also a way.Welcome to contribute and maybe @Raghu Angadi can show more messages. Best, Jingsong Lee ------------------------------------------------------------------From:Gwilym Evans <[email protected]>Time:2017 Jun 30 (Fri) 13:27To:JingsongLee <[email protected]>Subject:Re: Kafka offset management Thanks for the info. I'll have a look and see if I can do anything similar. I am very new to Beam internals, but I've been having a look at the KafkaIO code and comparing it to the PubSubIO code. I think that the finalizeCheckpoint implementation for KafkaIO should definitely be committing the offsets to Kafka, if possible. But perhaps only when a Kafka group.id is configured, as committing offsets for random or blank group IDs is kind of pointless. I think I'll take a shot at making and contributing this, even if it's optional. Unless you can think of a reason to specifically not do this? Though, looking a the KafkaIO source for this, there is even a comment there alluding to the fact that this should maybe be done to provide better restart options. -Gwilym
On 30 June 2017 at 05:08, JingsongLee <[email protected]> wrote: Oh. I know what you mean. In our production, if we need to re-run (lose checkpoint and state when a job crashes, is canceled, or is drained), we will set the KafkaIO startTime to start a new job, because we generally know the last consumer timestamp of previous job. (Do not be too precise, back to a safe point can be)This feature is finished in 2.1.0 version. Jira: https://issues.apache.org/jira/browse/BEAM-2248 A more accurate way is re-run by kafka offsets(not support yet), but you should konw the last snapshot of job. Best, Jingsong Lee ------------------------------------------------------------------From:Gwilym Evans <[email protected]>Time:2017 Jun 30 (Fri) 12:20To:user <[email protected]>; JingsongLee <[email protected]>Subject:Re: Kafka offset management Hi JingsongLee, Thanks for the reply. What I'm trying to avoid are lost / skipped messages due to two situations: 1. Lost offsets, or2. Premature offset commits I've researched snapshot checkpoints, and from what I understand these are only maintained in Dataflow when a job is updated. If a job crashes, is cancelled, or is drained, then the checkpoints are lost. This is situation (1) above. From what I understand about auto-commit offsets, it's where the Kafka client periodically commits offsets it has polled automatically. In the case of Beam and Dataflow, this would be even if the offsets it is committing has not yet been fully processed by the pipeline. This is situation (2) above. So far I'm not seeing a way to avoid data loss besides resetting to the earliest offset when a job starts. But, given we retain data in our Kafka topics for up to 7 days, that is not feasible from a performance point of view. Can anyone confirm / deny my understanding here? Cheers,Gwilym On 30 June 2017 at 02:59, JingsongLee <[email protected]> wrote: Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of commit offsets to Kafka for restarting.You can use a kafka client configuration to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = trueHope this helps. Best, JingsongLee ------------------------------------------------------------------From:Gwilym Evans <[email protected]>Time:2017 Jun 29 (Thu) 15:32To:user <[email protected]>Subject:Kafka offset management Hi list, I was playing around with KafkaIO today to understand how it behaves in a failure or restart scenario (be it crash, cancel, or drain), and I found it "lost" (or skipped) Kafka messages in these cases. That is, it resumed from the latest offsets rather than the last successfully processed offsets, and a gap in messages was observed as a result. My KafkaIO transform looks like: KafkaIO.<Long, String>readBytes() .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) .withTopics(ImmutableList.of(KAFKA_TOPIC)) The consumer factory is used because of some runtime SSL key/truststore setup: final Map<String, Object> config = Maps.newHashMap(); config.put("auto.offset.reset", "latest"); config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); config.put("group.id", KAFKA_GROUP_ID); config.put("key.deserializer", ByteArrayDeserializer.class.getName()); config.put("security.protocol", "SSL"); config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); config.put("ssl.truststore.location", truststore.toPath().toString()); config.put("ssl.truststore.password", kafkaTruststorePassword); config.put("value.deserializer", ByteArrayDeserializer.class.getName()); return new KafkaConsumer<>(config); So, I am setting a group.id, and I know KafkaIO is using the new consumer because our Zookeeper is not accessible from Dataflow. When I look on the Kafka cluster, there is no record of the consumer group's offsets. So I take it KafkaIO is not committing offsets to Kafka. Can this be changed to commit offsets when a "batch" of streaming messages are seen as processed OK? I am fine with at-least-once. I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 Cheers,Gwilym
