Re: Kafka offset management

2017-06-30 Thread Gwilym Evans
Hi Raghu,

Thanks for the checkpointing info. I'll have to think about this more, but
I'm wondering if there's some custom work I can do to have the output of
the final stage perform some offset commits back to Kafka. There was
another thread here I commented in about giving Write transforms an output
so that further processing could be done if and only if a Write is
confirmed, and that would be very handy here. Is there perhaps some way to
wrap my PubSubIO Write stage in a custom class so that I can override
certain lifecycle stages to write up a commit to Kafka?

Regarding the truststore on dataflow: I required the Factory because of the
local file problem you mention, so I put the rest of the configs in there
too. My solution to the local file problem was to make the keystore /
truststore files a Java class resource, since those are distributed to
workers, and the resource is written to a temporary location on the worker
during consumer creation:

private static class KafkaConsumerFactoryFn
implements SerializableFunction,
Consumer> {

@Override
public Consumer apply(Map
stringObjectMap) {
// kafka's truststore is embedded as a static resource, but
kafka wants it as a file on disk, so write it
// to a temp file and use that
final File truststore;
try {
truststore = File.createTempFile("truststore", ".jks");
truststore.deleteOnExit();

InputStream truststoreStream =
JOB_CLASS_HERE.class.getResourceAsStream("RESOURCE_NAME_HERE");
Files.copy(truststoreStream, truststore.toPath(),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
e.printStackTrace();
return null;
}
...
config.put("ssl.truststore.location",
truststore.toPath().toString());

Cheers,
Gwilym



On 30 June 2017 at 16:36, Raghu Angadi  wrote:

> Gwilym,
>
> I think your understanding is correct, with one caveat as noted below. As
> Jingsong suggested committing offsets in 'KafkaCheckpointMark.
> finalizeCheckpoint()
> '
> is required. I had left a comment there. It is fairly straight forward to
> add this as an option.
>
> Note that finalizing would give at least once semantics only if you drain
> a pipeline before restarting it. If a pipeline is killed or crashes, you
> can still miss some records. Finalize checkpoint is called in Dataflow once
> the messages are checkpointed for the current stage. The downstream stages
> might not have processed them. Draining a pipeline ensures that all the
> input is processed through the pipeline.
>
> > The consumer factory is used because of some runtime SSL key/truststore
> setup:
>
> btw, KafkaIO includes api to set
> 
>  consumer
> configs. Wasn't it enough?
> Did you get trust store config working with Dataflow? Last I remember
> "ssl.truststore.location" had to be a local file on the worker and it was
> not easy to make that accessible.
>
> On Thu, Jun 29, 2017 at 10:47 PM, JingsongLee 
> wrote:
>
>> Sorry, forget user mail group. + @user
>>
>> Yeah, although KafkaIO (exposed offsets interface) is different from P
>> ubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also
>> a way.
>> Welcome to contribute and maybe @Raghu Angadi can show more messages.
>>
>> Best, Jingsong Lee
>>
>> --
>> From:Gwilym Evans 
>> Time:2017 Jun 30 (Fri) 13:27
>> To:JingsongLee 
>> Subject:Re: Kafka offset management
>>
>> Thanks for the info. I'll have a look and see if I can do anything
>> similar.
>>
>> I am very new to Beam internals, but I've been having a look at the
>> KafkaIO code and comparing it to the PubSubIO code.
>>
>> I think that the finalizeCheckpoint implementation for KafkaIO should
>> definitely be committing the offsets to Kafka, if possible. But perhaps
>> only when a Kafka group.id is configured, as committing offsets for
>> random or blank group IDs is kind of pointless.
>>
>> I think I'll take a shot at making and contributing this, even if it's
>> optional. Unless you can think of a reason to specifically not do this?
>>
>> Though, looking a the KafkaIO source for this, there is even a comment
>> there alluding to the fact that this should maybe be done to provide better
>> restart options.
>>
>> -Gwilym
>>
>>
>> On 30 June 2017 at 05:08, JingsongLee  wrote:
>> Oh. I know what you mean.
>>
>> In our production, if we need to re-run (lose checkpoint and state when
>> a job crashes, is canceled, or is drained), we will set the KafkaIO
>> startTime to start a new job, because we generally know the last
>> consumer timestamp of previous job. (Do no

Re: Kafka offset management

2017-06-30 Thread Raghu Angadi
Gwilym,

I think your understanding is correct, with one caveat as noted below. As
Jingsong suggested committing offsets in '
KafkaCheckpointMark.finalizeCheckpoint()
'
is required. I had left a comment there. It is fairly straight forward to
add this as an option.

Note that finalizing would give at least once semantics only if you drain a
pipeline before restarting it. If a pipeline is killed or crashes, you can
still miss some records. Finalize checkpoint is called in Dataflow once the
messages are checkpointed for the current stage. The downstream stages
might not have processed them. Draining a pipeline ensures that all the
input is processed through the pipeline.

> The consumer factory is used because of some runtime SSL key/truststore
setup:

btw, KafkaIO includes api to set

consumer
configs. Wasn't it enough?
Did you get trust store config working with Dataflow? Last I remember
"ssl.truststore.location" had to be a local file on the worker and it was
not easy to make that accessible.

On Thu, Jun 29, 2017 at 10:47 PM, JingsongLee 
wrote:

> Sorry, forget user mail group. + @user
>
> Yeah, although KafkaIO (exposed offsets interface) is different from
> PubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also
> a way.
> Welcome to contribute and maybe @Raghu Angadi can show more messages.
>
> Best, Jingsong Lee
>
> --
> From:Gwilym Evans 
> Time:2017 Jun 30 (Fri) 13:27
> To:JingsongLee 
> Subject:Re: Kafka offset management
>
> Thanks for the info. I'll have a look and see if I can do anything similar.
>
> I am very new to Beam internals, but I've been having a look at the
> KafkaIO code and comparing it to the PubSubIO code.
>
> I think that the finalizeCheckpoint implementation for KafkaIO should
> definitely be committing the offsets to Kafka, if possible. But perhaps
> only when a Kafka group.id is configured, as committing offsets for
> random or blank group IDs is kind of pointless.
>
> I think I'll take a shot at making and contributing this, even if it's
> optional. Unless you can think of a reason to specifically not do this?
>
> Though, looking a the KafkaIO source for this, there is even a comment
> there alluding to the fact that this should maybe be done to provide better
> restart options.
>
> -Gwilym
>
>
> On 30 June 2017 at 05:08, JingsongLee  wrote:
> Oh. I know what you mean.
>
> In our production, if we need to re-run (lose checkpoint and state when
> a job crashes, is canceled, or is drained), we will set the KafkaIO
> startTime to start a new job, because we generally know the last consumer
> timestamp of previous job. (Do not be too precise, back
> to a safe point can be)
> This feature is finished in 2.1.0 version.
> Jira: https://issues.apache.org/jira/browse/BEAM-2248
>
> A more accurate way is re-run by kafka offsets(not support yet), but you
> should konw the last snapshot of job.
>
> Best, Jingsong Lee
>
> --
> From:Gwilym Evans 
> Time:2017 Jun 30 (Fri) 12:20
> To:user ; JingsongLee 
> Subject:Re: Kafka offset management
>
> Hi JingsongLee,
>
> Thanks for the reply.
>
> What I'm trying to avoid are lost / skipped messages due to two situations:
>
> 1. Lost offsets, or
> 2. Premature offset commits
>
> I've researched snapshot checkpoints, and from what I understand these are
> only maintained in Dataflow when a job is updated. If a job crashes, is
> cancelled, or is drained, then the checkpoints are lost. This is situation
> (1) above.
>
> From what I understand about auto-commit offsets, it's where the Kafka
> client periodically commits offsets it has polled automatically. In the
> case of Beam and Dataflow, this would be even if the offsets it is
> committing has not yet been fully processed by the pipeline. This is
> situation (2) above.
>
> So far I'm not seeing a way to avoid data loss besides resetting to the
> earliest offset when a job starts. But, given we retain data in our Kafka
> topics for up to 7 days, that is not feasible from a performance point of
> view.
>
> Can anyone confirm / deny my understanding here?
>
> Cheers,
> Gwilym
>
> On 30 June 2017 at 02:59, JingsongLee  wrote:
> Hi Gwilym
> KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit 
> offsets to
> Kafka for restarting.
> You can use a kafka client configuration to open auto commit of offsets.
>
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true
>
> Hope this helps.
>
> Best, JingsongLee
>
> --
> From:Gwilym Evans 
> Time:2017 Jun 29 (Thu) 15:32
> To:user 
> Subject:Kafka offset management

Re: Beam Slack channel

2017-06-30 Thread Jean-Baptiste Onofré

Done, you should have received an invite.

Welcome !

Regards
JB

On 06/30/2017 12:02 PM, AndrasNagy wrote:

Hi,

I also would like to join the slack chanel :)
Shall I ask for the Dev Slack separately or there is only a single one?

Andras

2017-06-29 12:17 GMT+02:00 James >:


Invite sent.

On Thu, 29 Jun 2017 at 6:07 PM Tolsa, Camille mailto:camille.tolsa-...@veolia.com>> wrote:

Hello,


Could you also invite me at camille.to...@gmail.com
 thanks

On 29 June 2017 at 09:29, Ismaël Mejía mailto:ieme...@gmail.com>> wrote:

Invitation sent!

On Thu, Jun 29, 2017 at 9:16 AM, Patrick Reames
mailto:patrick.rea...@gmail.com>> wrote:
 > Can i also get an invite?
 >
 > On 2017-06-25 08:51 (-0500), Aleksandr mailto:a...@gmail.com>> wrote:
 >> Hello,>
 >> Can someone  please add me to the slack channel?>
 >>
 >> Best regards>
 >> Aleksandr Gortujev.>
 >>
 >






This e-mail transmission (message and any attached files) may contain
information that is proprietary, privileged and/or confidential to
Veolia Environnement and/or its affiliates and is intended exclusively
for the person(s) to whom it is addressed. If you are not the intended
recipient, please notify the sender by return e-mail and delete all
copies of this e-mail, including all attachments. Unless expressly
authorized, any use, disclosure, publication, retransmission or
dissemination of this e-mail and/or of its attachments is strictly
prohibited.

Ce message electronique et ses fichiers attaches sont strictement
confidentiels et peuvent contenir des elements dont Veolia Environnement
et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
message par erreur, merci de le retourner a son emetteur et de le
detruire ainsi que toutes les pieces attachees. L'utilisation, la
divulgation, la publication, la distribution, ou la reproduction non
expressement autorisees de ce message et de ses pieces attachees sont
interdites.






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Beam Slack channel

2017-06-30 Thread AndrasNagy
Hi,

I also would like to join the slack chanel :)
Shall I ask for the Dev Slack separately or there is only a single one?

Andras

2017-06-29 12:17 GMT+02:00 James :

> Invite sent.
>
> On Thu, 29 Jun 2017 at 6:07 PM Tolsa, Camille <
> camille.tolsa-...@veolia.com> wrote:
>
>> Hello,
>>
>>
>> Could you also invite me at camille.to...@gmail.com thanks
>>
>> On 29 June 2017 at 09:29, Ismaël Mejía  wrote:
>>
>>> Invitation sent!
>>>
>>> On Thu, Jun 29, 2017 at 9:16 AM, Patrick Reames
>>>  wrote:
>>> > Can i also get an invite?
>>> >
>>> > On 2017-06-25 08:51 (-0500), Aleksandr  wrote:
>>> >> Hello,>
>>> >> Can someone  please add me to the slack channel?>
>>> >>
>>> >> Best regards>
>>> >> Aleksandr Gortujev.>
>>> >>
>>> >
>>>
>>
>>
>>
>> 
>> 
>> This e-mail transmission (message and any attached files) may contain
>> information that is proprietary, privileged and/or confidential to Veolia
>> Environnement and/or its affiliates and is intended exclusively for the
>> person(s) to whom it is addressed. If you are not the intended recipient,
>> please notify the sender by return e-mail and delete all copies of this
>> e-mail, including all attachments. Unless expressly authorized, any use,
>> disclosure, publication, retransmission or dissemination of this e-mail
>> and/or of its attachments is strictly prohibited.
>>
>> Ce message electronique et ses fichiers attaches sont strictement
>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>> message par erreur, merci de le retourner a son emetteur et de le detruire
>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>> publication, la distribution, ou la reproduction non expressement
>> autorisees de ce message et de ses pieces attachees sont interdites.
>> 
>> 
>>
>


Re: Recommended Flink Version

2017-06-30 Thread Aljoscha Krettek
I just tried it with the Beam Quickstart form the website and a fresh Flink 
1.2.1 download. This is the command I used for building the Quickstart:

mvn package -DskipTests -Pflink-runner

(The important bit here is -Pflink-runner which brings in the Flink Runner and 
its dependencies)

For running I did this:

bin/start-cluster.sh
bin/flink run -c org.apache.beam.examples.WordCount 
../word-count-beam/target/word-count-beam-0.1.jar  
--inputFile=/Users/aljoscha/Downloads/word-count-beam/pom.xml  
--output=/tmp/counts --runner=FlinkRunner

Best,
Aljoscha


> On 30. Jun 2017, at 01:02, Will Walters  wrote:
> 
> As for creating a fat jar, I've encountered difficulty with this as well. 
> When attempting to compile (with Maven, using the package command) the Beam 
> master to a jar, it instead creates several separate jars in each subfolder. 
> And without messing with the pom file, it fails because of missing 
> dependencies. I've tried to get around this by altering the pom file to 
> include all possible dependencies, but even after doing this, the job 
> submission will still fail, citing some missing dependency.
> 
> Any advice on how to create a working fat jar?
> 
> Thanks,
> Will.
> 
> On Thursday, June 29, 2017 1:28 AM, Jean-Baptiste Onofré  
> wrote:
> 
> 
> Good point, fair enough.
> 
> Regards
> JB
> 
> On 06/29/2017 10:26 AM, Aljoscha Krettek wrote:
> > I think it’s a bug because if you start a Flink cluster on bare-metal it 
> > works, just when it’s started in YARN it doesn’t. And I feel that the way 
> > you start your cluster should not affect how you can submit jobs to it.
> > 
> > Best,
> > Aljoscha
> > 
> >> On 29. Jun 2017, at 10:15, Jean-Baptiste Onofré  >> > wrote:
> >>
> >> Yes, it's the same with the spark runner using bin/spark-submit. From my 
> >> standpoint, it's not a bug, it's a feature request.
> >>
> >> Regards
> >> JB
> >>
> >> On 06/29/2017 10:12 AM, Aljoscha Krettek wrote:
> >>> I also responded to a separate mail by Will. The problem is that 
> >>> currently we cannot submit a job using the remote client to a Flink 
> >>> cluster that was started on YARN. (It’s a bug or “feature” of how 
> >>> communication with a Flink cluster from a client works.)
> >>> The workaround for that is to use the bin/flink command to submit a Beam 
> >>> fat-jar on a Flink YARN cluster.
> >>> Best,
> >>> Aljoscha
>  On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré   > wrote:
> 
>  Hi Will,
> 
>  assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by 
>  default. So, I would recommend this version or 1.2.x.
> 
>  Regards
>  JB
> 
>  On 06/28/2017 10:39 PM, Will Walters wrote:
> > Hello,
> > I've been attempting to run Beam through Flink on a Yarn cluster and 
> > have run into trouble with getting a job to submit, partly because of 
> > incompatibility between versions. Does anyone know what versions of 
> > Beam and Flink I should be using to give myself the best chance of 
> > finding compatibility?
> > Thank you,
> > Will.
> 
>  -- 
>  Jean-Baptiste Onofré
>  jbono...@apache.org 
>  http://blog.nanthrax.net 
>  Talend - http://www.talend.com 
> 
> >>
> >> -- 
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org 
> >> http://blog.nanthrax.net 
> >> Talend - http://www.talend.com 
> > 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org 
> http://blog.nanthrax.net 
> Talend - http://www.talend.com 
> 
>