Re: Kafka offset management

2017-06-29 Thread JingsongLee
Sorry, forget user mail group. + @user 
Yeah, although KafkaIO (exposed offsets interface) is different from PubSubIO, 
committing the offsets to Kafka in finalizeCheckpoint is also a way.Welcome to 
contribute and maybe @Raghu Angadi can show more messages.
Best, Jingsong Lee
--From:Gwilym 
Evans Time:2017 Jun 30 (Fri) 13:27To:JingsongLee 
Subject:Re: Kafka offset management
Thanks for the info. I'll have a look and see if I can do anything similar.
I am very new to Beam internals, but I've been having a look at the KafkaIO 
code and comparing it to the PubSubIO code.
I think that the finalizeCheckpoint implementation for KafkaIO should 
definitely be committing the offsets to Kafka, if possible. But perhaps only 
when a Kafka group.id is configured, as committing offsets for random or blank 
group IDs is kind of pointless.
I think I'll take a shot at making and contributing this, even if it's 
optional. Unless you can think of a reason to specifically not do this?
Though, looking a the KafkaIO source for this, there is even a comment there 
alluding to the fact that this should maybe be done to provide better restart 
options.
-Gwilym

On 30 June 2017 at 05:08, JingsongLee  wrote:
Oh. I know what you mean.
In our production, if we need to re-run (lose checkpoint and state when a job 
crashes, is canceled, or is drained), we will set the KafkaIO startTime to 
start a new job, because we generally know the last consumer timestamp of 
previous job. (Do not be too precise, back to a safe point can be)This feature 
is finished in 2.1.0 version. Jira: 
https://issues.apache.org/jira/browse/BEAM-2248
A more accurate way is re-run by kafka offsets(not support yet), but you should 
konw the last snapshot of job.
Best, Jingsong Lee
--From:Gwilym 
Evans Time:2017 Jun 30 (Fri) 12:20To:user 
; JingsongLee Subject:Re: Kafka 
offset management
Hi JingsongLee,
Thanks for the reply.
What I'm trying to avoid are lost / skipped messages due to two situations:
1. Lost offsets, or2. Premature offset commits
I've researched snapshot checkpoints, and from what I understand these are only 
maintained in Dataflow when a job is updated. If a job crashes, is cancelled, 
or is drained, then the checkpoints are lost. This is situation (1) above.
From what I understand about auto-commit offsets, it's where the Kafka client 
periodically commits offsets it has polled automatically. In the case of Beam 
and Dataflow, this would be even if the offsets it is committing has not yet 
been fully processed by the pipeline. This is situation (2) above.
So far I'm not seeing a way to avoid data loss besides resetting to the 
earliest offset when a job starts. But, given we retain data in our Kafka 
topics for up to 7 days, that is not feasible from a performance point of view.
Can anyone confirm / deny my understanding here?
Cheers,Gwilym
On 30 June 2017 at 02:59, JingsongLee  wrote:
Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of 
commit offsets to Kafka for restarting.You can use a kafka client configuration 
to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 
("enable.auto.commit") = trueHope this helps.
Best, JingsongLee
--From:Gwilym 
Evans Time:2017 Jun 29 (Thu) 15:32To:user 
Subject:Kafka offset management
Hi list,
I was playing around with KafkaIO today to understand how it behaves in a 
failure or restart scenario (be it crash, cancel, or drain), and I found it 
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from the 
latest offsets rather than the last successfully processed offsets, and a gap 
in messages was observed as a result.
My KafkaIO transform looks like:
KafkaIO.readBytes()                .withConsumerFactoryFn(new 
KafkaConsumerFactoryFn())                
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)                
.withTopics(ImmutableList.of(KAFKA_TOPIC))
The consumer factory is used because of some runtime SSL key/truststore setup:
            final Map config = Maps.newHashMap();            
config.put("auto.offset.reset", "latest");            
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);            
config.put("group.id", KAFKA_GROUP_ID);            
config.put("key.deserializer", ByteArrayDeserializer.class.getName());          
  config.put("security.protocol", "SSL");            
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");            
config.put("ssl.truststore.location", truststore.toPath().toString());          
  config.put("ssl.truststore.password", 

Re: Kafka offset management

2017-06-29 Thread Gwilym Evans
Hi JingsongLee,

Thanks for the reply.

What I'm trying to avoid are lost / skipped messages due to two situations:

1. Lost offsets, or
2. Premature offset commits

I've researched snapshot checkpoints, and from what I understand these are
only maintained in Dataflow when a job is updated. If a job crashes, is
cancelled, or is drained, then the checkpoints are lost. This is situation
(1) above.

>From what I understand about auto-commit offsets, it's where the Kafka
client periodically commits offsets it has polled automatically. In the
case of Beam and Dataflow, this would be even if the offsets it is
committing has not yet been fully processed by the pipeline. This is
situation (2) above.

So far I'm not seeing a way to avoid data loss besides resetting to the
earliest offset when a job starts. But, given we retain data in our Kafka
topics for up to 7 days, that is not feasible from a performance point of
view.

Can anyone confirm / deny my understanding here?

Cheers,
Gwilym

On 30 June 2017 at 02:59, JingsongLee  wrote:

> Hi Gwilym
> KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit 
> offsets to
> Kafka for restarting.
> You can use a kafka client configuration to open auto commit of offsets.
>
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true
>
> Hope this helps.
>
> Best, JingsongLee
>
> --
> From:Gwilym Evans 
> Time:2017 Jun 29 (Thu) 15:32
> To:user 
> Subject:Kafka offset management
>
> Hi list,
>
> I was playing around with KafkaIO today to understand how it behaves in a
> failure or restart scenario (be it crash, cancel, or drain), and I found it
> "lost" (or skipped) Kafka messages in these cases. That is, it resumed from
> the latest offsets rather than the last successfully processed offsets, and
> a gap in messages was observed as a result.
>
> My KafkaIO transform looks like:
>
> KafkaIO.readBytes()
> .withConsumerFactoryFn(new KafkaConsumerFactoryFn())
> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
> .withTopics(ImmutableList.of(KAFKA_TOPIC))
>
> The consumer factory is used because of some runtime SSL key/truststore
> setup:
>
> final Map config = Maps.newHashMap();
> config.put("auto.offset.reset", "latest");
> config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
> config.put("group.id", KAFKA_GROUP_ID);
> config.put("key.deserializer", ByteArrayDeserializer.class.
> getName());
> config.put("security.protocol", "SSL");
> config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
> config.put("ssl.truststore.location",
> truststore.toPath().toString());
> config.put("ssl.truststore.password",
> kafkaTruststorePassword);
> config.put("value.deserializer", ByteArrayDeserializer.class.
> getName());
>
> return new KafkaConsumer<>(config);
>
> So, I am setting a group.id, and I know KafkaIO is using the new consumer
> because our Zookeeper is not accessible from Dataflow.
>
> When I look on the Kafka cluster, there is no record of the consumer
> group's offsets. So I take it KafkaIO is not committing offsets to Kafka.
>
> Can this be changed to commit offsets when a "batch" of streaming messages
> are seen as processed OK? I am fine with at-least-once.
>
> I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
>
> Cheers,
> Gwilym
>
>
>


Re: Kafka offset management

2017-06-29 Thread JingsongLee
Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of 
commit offsets to Kafka for restarting.You can use a kafka client configuration 
to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 
("enable.auto.commit") = trueHope this helps.
Best, JingsongLee
--From:Gwilym 
Evans Time:2017 Jun 29 (Thu) 15:32To:user 
Subject:Kafka offset management
Hi list,
I was playing around with KafkaIO today to understand how it behaves in a 
failure or restart scenario (be it crash, cancel, or drain), and I found it 
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from the 
latest offsets rather than the last successfully processed offsets, and a gap 
in messages was observed as a result.
My KafkaIO transform looks like:
KafkaIO.readBytes()                .withConsumerFactoryFn(new 
KafkaConsumerFactoryFn())                
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)                
.withTopics(ImmutableList.of(KAFKA_TOPIC))
The consumer factory is used because of some runtime SSL key/truststore setup:
            final Map config = Maps.newHashMap();            
config.put("auto.offset.reset", "latest");            
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);            
config.put("group.id", KAFKA_GROUP_ID);            
config.put("key.deserializer", ByteArrayDeserializer.class.getName());          
  config.put("security.protocol", "SSL");            
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");            
config.put("ssl.truststore.location", truststore.toPath().toString());          
  config.put("ssl.truststore.password", kafkaTruststorePassword);             
config.put("value.deserializer", ByteArrayDeserializer.class.getName());
            return new KafkaConsumer<>(config);
So, I am setting a group.id, and I know KafkaIO is using the new consumer 
because our Zookeeper is not accessible from Dataflow.
When I look on the Kafka cluster, there is no record of the consumer group's 
offsets. So I take it KafkaIO is not committing offsets to Kafka.
Can this be changed to commit offsets when a "batch" of streaming messages are 
seen as processed OK? I am fine with at-least-once.
I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
Cheers,Gwilym



Re: Recommended Flink Version

2017-06-29 Thread Will Walters
As for creating a fat jar, I've encountered difficulty with this as well. When 
attempting to compile (with Maven, using the package command) the Beam master 
to a jar, it instead creates several separate jars in each subfolder. And 
without messing with the pom file, it fails because of missing dependencies. 
I've tried to get around this by altering the pom file to include all possible 
dependencies, but even after doing this, the job submission will still fail, 
citing some missing dependency. 
Any advice on how to create a working fat jar?
Thanks,Will.
On Thursday, June 29, 2017 1:28 AM, Jean-Baptiste Onofré 
 wrote:
 

 Good point, fair enough.

Regards
JB

On 06/29/2017 10:26 AM, Aljoscha Krettek wrote:
> I think it’s a bug because if you start a Flink cluster on bare-metal it 
> works, just when it’s started in YARN it doesn’t. And I feel that the way you 
> start your cluster should not affect how you can submit jobs to it.
> 
> Best,
> Aljoscha
> 
>> On 29. Jun 2017, at 10:15, Jean-Baptiste Onofré  wrote:
>>
>> Yes, it's the same with the spark runner using bin/spark-submit. From my 
>> standpoint, it's not a bug, it's a feature request.
>>
>> Regards
>> JB
>>
>> On 06/29/2017 10:12 AM, Aljoscha Krettek wrote:
>>> I also responded to a separate mail by Will. The problem is that currently 
>>> we cannot submit a job using the remote client to a Flink cluster that was 
>>> started on YARN. (It’s a bug or “feature” of how communication with a Flink 
>>> cluster from a client works.)
>>> The workaround for that is to use the bin/flink command to submit a Beam 
>>> fat-jar on a Flink YARN cluster.
>>> Best,
>>> Aljoscha
 On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré  wrote:

 Hi Will,

 assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by 
 default. So, I would recommend this version or 1.2.x.

 Regards
 JB

 On 06/28/2017 10:39 PM, Will Walters wrote:
> Hello,
> I've been attempting to run Beam through Flink on a Yarn cluster and have 
> run into trouble with getting a job to submit, partly because of 
> incompatibility between versions. Does anyone know what versions of Beam 
> and Flink I should be using to give myself the best chance of finding 
> compatibility?
> Thank you,
> Will.

 -- 
 Jean-Baptiste Onofré
 jbono...@apache.org
 http://blog.nanthrax.net
 Talend - http://www.talend.com
>>
>> -- 
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


   

Re: Beam Slack channel

2017-06-29 Thread James
Invite sent.
On Thu, 29 Jun 2017 at 6:07 PM Tolsa, Camille 
wrote:

> Hello,
>
>
> Could you also invite me at camille.to...@gmail.com thanks
>
> On 29 June 2017 at 09:29, Ismaël Mejía  wrote:
>
>> Invitation sent!
>>
>> On Thu, Jun 29, 2017 at 9:16 AM, Patrick Reames
>>  wrote:
>> > Can i also get an invite?
>> >
>> > On 2017-06-25 08:51 (-0500), Aleksandr  wrote:
>> >> Hello,>
>> >> Can someone  please add me to the slack channel?>
>> >>
>> >> Best regards>
>> >> Aleksandr Gortujev.>
>> >>
>> >
>>
>
>
>
>
> 
> This e-mail transmission (message and any attached files) may contain
> information that is proprietary, privileged and/or confidential to Veolia
> Environnement and/or its affiliates and is intended exclusively for the
> person(s) to whom it is addressed. If you are not the intended recipient,
> please notify the sender by return e-mail and delete all copies of this
> e-mail, including all attachments. Unless expressly authorized, any use,
> disclosure, publication, retransmission or dissemination of this e-mail
> and/or of its attachments is strictly prohibited.
>
> Ce message electronique et ses fichiers attaches sont strictement
> confidentiels et peuvent contenir des elements dont Veolia Environnement
> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
> message par erreur, merci de le retourner a son emetteur et de le detruire
> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
> publication, la distribution, ou la reproduction non expressement
> autorisees de ce message et de ses pieces attachees sont interdites.
>
> 
>


Re: Recommended Flink Version

2017-06-29 Thread Aljoscha Krettek
I think it’s a bug because if you start a Flink cluster on bare-metal it works, 
just when it’s started in YARN it doesn’t. And I feel that the way you start 
your cluster should not affect how you can submit jobs to it.

Best,
Aljoscha

> On 29. Jun 2017, at 10:15, Jean-Baptiste Onofré  wrote:
> 
> Yes, it's the same with the spark runner using bin/spark-submit. From my 
> standpoint, it's not a bug, it's a feature request.
> 
> Regards
> JB
> 
> On 06/29/2017 10:12 AM, Aljoscha Krettek wrote:
>> I also responded to a separate mail by Will. The problem is that currently 
>> we cannot submit a job using the remote client to a Flink cluster that was 
>> started on YARN. (It’s a bug or “feature” of how communication with a Flink 
>> cluster from a client works.)
>> The workaround for that is to use the bin/flink command to submit a Beam 
>> fat-jar on a Flink YARN cluster.
>> Best,
>> Aljoscha
>>> On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré  wrote:
>>> 
>>> Hi Will,
>>> 
>>> assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by 
>>> default. So, I would recommend this version or 1.2.x.
>>> 
>>> Regards
>>> JB
>>> 
>>> On 06/28/2017 10:39 PM, Will Walters wrote:
 Hello,
 I've been attempting to run Beam through Flink on a Yarn cluster and have 
 run into trouble with getting a job to submit, partly because of 
 incompatibility between versions. Does anyone know what versions of Beam 
 and Flink I should be using to give myself the best chance of finding 
 compatibility?
 Thank you,
 Will.
>>> 
>>> -- 
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com



Re: Recommended Flink Version

2017-06-29 Thread Jean-Baptiste Onofré
Yes, it's the same with the spark runner using bin/spark-submit. From my 
standpoint, it's not a bug, it's a feature request.


Regards
JB

On 06/29/2017 10:12 AM, Aljoscha Krettek wrote:

I also responded to a separate mail by Will. The problem is that currently we 
cannot submit a job using the remote client to a Flink cluster that was started 
on YARN. (It’s a bug or “feature” of how communication with a Flink cluster 
from a client works.)

The workaround for that is to use the bin/flink command to submit a Beam 
fat-jar on a Flink YARN cluster.

Best,
Aljoscha


On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré  wrote:

Hi Will,

assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by 
default. So, I would recommend this version or 1.2.x.

Regards
JB

On 06/28/2017 10:39 PM, Will Walters wrote:

Hello,
I've been attempting to run Beam through Flink on a Yarn cluster and have run 
into trouble with getting a job to submit, partly because of incompatibility 
between versions. Does anyone know what versions of Beam and Flink I should be 
using to give myself the best chance of finding compatibility?
Thank you,
Will.


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Recommended Flink Version

2017-06-29 Thread Aljoscha Krettek
I also responded to a separate mail by Will. The problem is that currently we 
cannot submit a job using the remote client to a Flink cluster that was started 
on YARN. (It’s a bug or “feature” of how communication with a Flink cluster 
from a client works.)

The workaround for that is to use the bin/flink command to submit a Beam 
fat-jar on a Flink YARN cluster.

Best,
Aljoscha

> On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré  wrote:
> 
> Hi Will,
> 
> assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by 
> default. So, I would recommend this version or 1.2.x.
> 
> Regards
> JB
> 
> On 06/28/2017 10:39 PM, Will Walters wrote:
>> Hello,
>> I've been attempting to run Beam through Flink on a Yarn cluster and have 
>> run into trouble with getting a job to submit, partly because of 
>> incompatibility between versions. Does anyone know what versions of Beam and 
>> Flink I should be using to give myself the best chance of finding 
>> compatibility?
>> Thank you,
>> Will.
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com



Kafka offset management

2017-06-29 Thread Gwilym Evans
Hi list,

I was playing around with KafkaIO today to understand how it behaves in a
failure or restart scenario (be it crash, cancel, or drain), and I found it
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from
the latest offsets rather than the last successfully processed offsets, and
a gap in messages was observed as a result.

My KafkaIO transform looks like:

KafkaIO.readBytes()
.withConsumerFactoryFn(new KafkaConsumerFactoryFn())
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
.withTopics(ImmutableList.of(KAFKA_TOPIC))

The consumer factory is used because of some runtime SSL key/truststore
setup:

final Map config = Maps.newHashMap();
config.put("auto.offset.reset", "latest");
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
config.put("group.id", KAFKA_GROUP_ID);
config.put("key.deserializer",
ByteArrayDeserializer.class.getName());
config.put("security.protocol", "SSL");
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
config.put("ssl.truststore.location",
truststore.toPath().toString());
config.put("ssl.truststore.password", kafkaTruststorePassword);
config.put("value.deserializer",
ByteArrayDeserializer.class.getName());

return new KafkaConsumer<>(config);

So, I am setting a group.id, and I know KafkaIO is using the new consumer
because our Zookeeper is not accessible from Dataflow.

When I look on the Kafka cluster, there is no record of the consumer
group's offsets. So I take it KafkaIO is not committing offsets to Kafka.

Can this be changed to commit offsets when a "batch" of streaming messages
are seen as processed OK? I am fine with at-least-once.

I am using Dataflow, Beam 2.0, Kafka 0.10.2.1

Cheers,
Gwilym


Re: Beam Slack channel

2017-06-29 Thread Ismaël Mejía
Invitation sent!

On Thu, Jun 29, 2017 at 9:16 AM, Patrick Reames
 wrote:
> Can i also get an invite?
>
> On 2017-06-25 08:51 (-0500), Aleksandr  wrote:
>> Hello,>
>> Can someone  please add me to the slack channel?>
>>
>> Best regards>
>> Aleksandr Gortujev.>
>>
>