Sorry, forget user mail group. + @user 
Yeah, although KafkaIO (exposed offsets interface) is different from PubSubIO, 
committing the offsets to Kafka in finalizeCheckpoint is also a way.Welcome to 
contribute and maybe @Raghu Angadi can show more messages.
Best, Jingsong Lee
------------------------------------------------------------------From:Gwilym 
Evans <[email protected]>Time:2017 Jun 30 (Fri) 13:27To:JingsongLee 
<[email protected]>Subject:Re: Kafka offset management
Thanks for the info. I'll have a look and see if I can do anything similar.
I am very new to Beam internals, but I've been having a look at the KafkaIO 
code and comparing it to the PubSubIO code.
I think that the finalizeCheckpoint implementation for KafkaIO should 
definitely be committing the offsets to Kafka, if possible. But perhaps only 
when a Kafka group.id is configured, as committing offsets for random or blank 
group IDs is kind of pointless.
I think I'll take a shot at making and contributing this, even if it's 
optional. Unless you can think of a reason to specifically not do this?
Though, looking a the KafkaIO source for this, there is even a comment there 
alluding to the fact that this should maybe be done to provide better restart 
options.
-Gwilym

On 30 June 2017 at 05:08, JingsongLee <[email protected]> wrote:
Oh. I know what you mean.
In our production, if we need to re-run (lose checkpoint and state when a job 
crashes, is canceled, or is drained), we will set the KafkaIO startTime to 
start a new job, because we generally know the last consumer timestamp of 
previous job. (Do not be too precise, back to a safe point can be)This feature 
is finished in 2.1.0 version. Jira: 
https://issues.apache.org/jira/browse/BEAM-2248
A more accurate way is re-run by kafka offsets(not support yet), but you should 
konw the last snapshot of job.
Best, Jingsong Lee
------------------------------------------------------------------From:Gwilym 
Evans <[email protected]>Time:2017 Jun 30 (Fri) 12:20To:user 
<[email protected]>; JingsongLee <[email protected]>Subject:Re: Kafka 
offset management
Hi JingsongLee,
Thanks for the reply.
What I'm trying to avoid are lost / skipped messages due to two situations:
1. Lost offsets, or2. Premature offset commits
I've researched snapshot checkpoints, and from what I understand these are only 
maintained in Dataflow when a job is updated. If a job crashes, is cancelled, 
or is drained, then the checkpoints are lost. This is situation (1) above.
From what I understand about auto-commit offsets, it's where the Kafka client 
periodically commits offsets it has polled automatically. In the case of Beam 
and Dataflow, this would be even if the offsets it is committing has not yet 
been fully processed by the pipeline. This is situation (2) above.
So far I'm not seeing a way to avoid data loss besides resetting to the 
earliest offset when a job starts. But, given we retain data in our Kafka 
topics for up to 7 days, that is not feasible from a performance point of view.
Can anyone confirm / deny my understanding here?
Cheers,Gwilym
On 30 June 2017 at 02:59, JingsongLee <[email protected]> wrote:
Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of 
commit offsets to Kafka for restarting.You can use a kafka client configuration 
to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 
("enable.auto.commit") = trueHope this helps.
Best, JingsongLee
------------------------------------------------------------------From:Gwilym 
Evans <[email protected]>Time:2017 Jun 29 (Thu) 15:32To:user 
<[email protected]>Subject:Kafka offset management
Hi list,
I was playing around with KafkaIO today to understand how it behaves in a 
failure or restart scenario (be it crash, cancel, or drain), and I found it 
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from the 
latest offsets rather than the last successfully processed offsets, and a gap 
in messages was observed as a result.
My KafkaIO transform looks like:
KafkaIO.<Long, String>readBytes()                .withConsumerFactoryFn(new 
KafkaConsumerFactoryFn())                
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)                
.withTopics(ImmutableList.of(KAFKA_TOPIC))
The consumer factory is used because of some runtime SSL key/truststore setup:
            final Map<String, Object> config = Maps.newHashMap();            
config.put("auto.offset.reset", "latest");            
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);            
config.put("group.id", KAFKA_GROUP_ID);            
config.put("key.deserializer", ByteArrayDeserializer.class.getName());          
  config.put("security.protocol", "SSL");            
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");            
config.put("ssl.truststore.location", truststore.toPath().toString());          
  config.put("ssl.truststore.password", kafkaTruststorePassword);             
config.put("value.deserializer", ByteArrayDeserializer.class.getName());
            return new KafkaConsumer<>(config);
So, I am setting a group.id, and I know KafkaIO is using the new consumer 
because our Zookeeper is not accessible from Dataflow.
When I look on the Kafka cluster, there is no record of the consumer group's 
offsets. So I take it KafkaIO is not committing offsets to Kafka.
Can this be changed to commit offsets when a "batch" of streaming messages are 
seen as processed OK? I am fine with at-least-once.
I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
Cheers,Gwilym





Reply via email to