Gwilym, I think your understanding is correct, with one caveat as noted below. As Jingsong suggested committing offsets in ' KafkaCheckpointMark.finalizeCheckpoint() <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L48>' is required. I had left a comment there. It is fairly straight forward to add this as an option.
Note that finalizing would give at least once semantics only if you drain a pipeline before restarting it. If a pipeline is killed or crashes, you can still miss some records. Finalize checkpoint is called in Dataflow once the messages are checkpointed for the current stage. The downstream stages might not have processed them. Draining a pipeline ensures that all the input is processed through the pipeline. > The consumer factory is used because of some runtime SSL key/truststore setup: btw, KafkaIO includes api to set <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L207> consumer configs. Wasn't it enough? Did you get trust store config working with Dataflow? Last I remember "ssl.truststore.location" had to be a local file on the worker and it was not easy to make that accessible. On Thu, Jun 29, 2017 at 10:47 PM, JingsongLee <[email protected]> wrote: > Sorry, forget user mail group. + @user > > Yeah, although KafkaIO (exposed offsets interface) is different from > PubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also > a way. > Welcome to contribute and maybe @Raghu Angadi can show more messages. > > Best, Jingsong Lee > > ------------------------------------------------------------------ > From:Gwilym Evans <[email protected]> > Time:2017 Jun 30 (Fri) 13:27 > To:JingsongLee <[email protected]> > Subject:Re: Kafka offset management > > Thanks for the info. I'll have a look and see if I can do anything similar. > > I am very new to Beam internals, but I've been having a look at the > KafkaIO code and comparing it to the PubSubIO code. > > I think that the finalizeCheckpoint implementation for KafkaIO should > definitely be committing the offsets to Kafka, if possible. But perhaps > only when a Kafka group.id is configured, as committing offsets for > random or blank group IDs is kind of pointless. > > I think I'll take a shot at making and contributing this, even if it's > optional. Unless you can think of a reason to specifically not do this? > > Though, looking a the KafkaIO source for this, there is even a comment > there alluding to the fact that this should maybe be done to provide better > restart options. > > -Gwilym > > > On 30 June 2017 at 05:08, JingsongLee <[email protected]> wrote: > Oh. I know what you mean. > > In our production, if we need to re-run (lose checkpoint and state when > a job crashes, is canceled, or is drained), we will set the KafkaIO > startTime to start a new job, because we generally know the last consumer > timestamp of previous job. (Do not be too precise, back > to a safe point can be) > This feature is finished in 2.1.0 version. > Jira: https://issues.apache.org/jira/browse/BEAM-2248 > > A more accurate way is re-run by kafka offsets(not support yet), but you > should konw the last snapshot of job. > > Best, Jingsong Lee > > ------------------------------------------------------------------ > From:Gwilym Evans <[email protected]> > Time:2017 Jun 30 (Fri) 12:20 > To:user <[email protected]>; JingsongLee <[email protected]> > Subject:Re: Kafka offset management > > Hi JingsongLee, > > Thanks for the reply. > > What I'm trying to avoid are lost / skipped messages due to two situations: > > 1. Lost offsets, or > 2. Premature offset commits > > I've researched snapshot checkpoints, and from what I understand these are > only maintained in Dataflow when a job is updated. If a job crashes, is > cancelled, or is drained, then the checkpoints are lost. This is situation > (1) above. > > From what I understand about auto-commit offsets, it's where the Kafka > client periodically commits offsets it has polled automatically. In the > case of Beam and Dataflow, this would be even if the offsets it is > committing has not yet been fully processed by the pipeline. This is > situation (2) above. > > So far I'm not seeing a way to avoid data loss besides resetting to the > earliest offset when a job starts. But, given we retain data in our Kafka > topics for up to 7 days, that is not feasible from a performance point of > view. > > Can anyone confirm / deny my understanding here? > > Cheers, > Gwilym > > On 30 June 2017 at 02:59, JingsongLee <[email protected]> wrote: > Hi Gwilym > KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit > offsets to > Kafka for restarting. > You can use a kafka client configuration to open auto commit of offsets. > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true > > Hope this helps. > > Best, JingsongLee > > ------------------------------------------------------------------ > From:Gwilym Evans <[email protected]> > Time:2017 Jun 29 (Thu) 15:32 > To:user <[email protected]> > Subject:Kafka offset management > > Hi list, > > I was playing around with KafkaIO today to understand how it behaves in a > failure or restart scenario (be it crash, cancel, or drain), and I found it > "lost" (or skipped) Kafka messages in these cases. That is, it resumed from > the latest offsets rather than the last successfully processed offsets, and > a gap in messages was observed as a result. > > My KafkaIO transform looks like: > > KafkaIO.<Long, String>readBytes() > .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) > .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) > .withTopics(ImmutableList.of(KAFKA_TOPIC)) > > The consumer factory is used because of some runtime SSL key/truststore > setup: > > final Map<String, Object> config = Maps.newHashMap(); > config.put("auto.offset.reset", "latest"); > config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); > config.put("group.id", KAFKA_GROUP_ID); > config.put("key.deserializer", ByteArrayDeserializer.class. > getName()); > config.put("security.protocol", "SSL"); > config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); > config.put("ssl.truststore.location", > truststore.toPath().toString()); > config.put("ssl.truststore.password", > kafkaTruststorePassword); > config.put("value.deserializer", ByteArrayDeserializer.class. > getName()); > > return new KafkaConsumer<>(config); > > So, I am setting a group.id, and I know KafkaIO is using the new consumer > because our Zookeeper is not accessible from Dataflow. > > When I look on the Kafka cluster, there is no record of the consumer > group's offsets. So I take it KafkaIO is not committing offsets to Kafka. > > Can this be changed to commit offsets when a "batch" of streaming messages > are seen as processed OK? I am fine with at-least-once. > > I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 > > Cheers, > Gwilym > > > > > > >
