Re: Kafka offset management
Sorry, forget user mail group. + @user Yeah, although KafkaIO (exposed offsets interface) is different from PubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also a way.Welcome to contribute and maybe @Raghu Angadi can show more messages. Best, Jingsong Lee --From:Gwilym EvansTime:2017 Jun 30 (Fri) 13:27To:JingsongLee Subject:Re: Kafka offset management Thanks for the info. I'll have a look and see if I can do anything similar. I am very new to Beam internals, but I've been having a look at the KafkaIO code and comparing it to the PubSubIO code. I think that the finalizeCheckpoint implementation for KafkaIO should definitely be committing the offsets to Kafka, if possible. But perhaps only when a Kafka group.id is configured, as committing offsets for random or blank group IDs is kind of pointless. I think I'll take a shot at making and contributing this, even if it's optional. Unless you can think of a reason to specifically not do this? Though, looking a the KafkaIO source for this, there is even a comment there alluding to the fact that this should maybe be done to provide better restart options. -Gwilym On 30 June 2017 at 05:08, JingsongLee wrote: Oh. I know what you mean. In our production, if we need to re-run (lose checkpoint and state when a job crashes, is canceled, or is drained), we will set the KafkaIO startTime to start a new job, because we generally know the last consumer timestamp of previous job. (Do not be too precise, back to a safe point can be)This feature is finished in 2.1.0 version. Jira: https://issues.apache.org/jira/browse/BEAM-2248 A more accurate way is re-run by kafka offsets(not support yet), but you should konw the last snapshot of job. Best, Jingsong Lee --From:Gwilym Evans Time:2017 Jun 30 (Fri) 12:20To:user ; JingsongLee Subject:Re: Kafka offset management Hi JingsongLee, Thanks for the reply. What I'm trying to avoid are lost / skipped messages due to two situations: 1. Lost offsets, or2. Premature offset commits I've researched snapshot checkpoints, and from what I understand these are only maintained in Dataflow when a job is updated. If a job crashes, is cancelled, or is drained, then the checkpoints are lost. This is situation (1) above. From what I understand about auto-commit offsets, it's where the Kafka client periodically commits offsets it has polled automatically. In the case of Beam and Dataflow, this would be even if the offsets it is committing has not yet been fully processed by the pipeline. This is situation (2) above. So far I'm not seeing a way to avoid data loss besides resetting to the earliest offset when a job starts. But, given we retain data in our Kafka topics for up to 7 days, that is not feasible from a performance point of view. Can anyone confirm / deny my understanding here? Cheers,Gwilym On 30 June 2017 at 02:59, JingsongLee wrote: Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of commit offsets to Kafka for restarting.You can use a kafka client configuration to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = trueHope this helps. Best, JingsongLee --From:Gwilym Evans Time:2017 Jun 29 (Thu) 15:32To:user Subject:Kafka offset management Hi list, I was playing around with KafkaIO today to understand how it behaves in a failure or restart scenario (be it crash, cancel, or drain), and I found it "lost" (or skipped) Kafka messages in these cases. That is, it resumed from the latest offsets rather than the last successfully processed offsets, and a gap in messages was observed as a result. My KafkaIO transform looks like: KafkaIO. readBytes() .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) .withTopics(ImmutableList.of(KAFKA_TOPIC)) The consumer factory is used because of some runtime SSL key/truststore setup: final Map config = Maps.newHashMap(); config.put("auto.offset.reset", "latest"); config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); config.put("group.id", KAFKA_GROUP_ID); config.put("key.deserializer", ByteArrayDeserializer.class.getName()); config.put("security.protocol", "SSL"); config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); config.put("ssl.truststore.location", truststore.toPath().toString()); config.put("ssl.truststore.password",
Re: Kafka offset management
Hi JingsongLee, Thanks for the reply. What I'm trying to avoid are lost / skipped messages due to two situations: 1. Lost offsets, or 2. Premature offset commits I've researched snapshot checkpoints, and from what I understand these are only maintained in Dataflow when a job is updated. If a job crashes, is cancelled, or is drained, then the checkpoints are lost. This is situation (1) above. >From what I understand about auto-commit offsets, it's where the Kafka client periodically commits offsets it has polled automatically. In the case of Beam and Dataflow, this would be even if the offsets it is committing has not yet been fully processed by the pipeline. This is situation (2) above. So far I'm not seeing a way to avoid data loss besides resetting to the earliest offset when a job starts. But, given we retain data in our Kafka topics for up to 7 days, that is not feasible from a performance point of view. Can anyone confirm / deny my understanding here? Cheers, Gwilym On 30 June 2017 at 02:59, JingsongLeewrote: > Hi Gwilym > KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit > offsets to > Kafka for restarting. > You can use a kafka client configuration to open auto commit of offsets. > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true > > Hope this helps. > > Best, JingsongLee > > -- > From:Gwilym Evans > Time:2017 Jun 29 (Thu) 15:32 > To:user > Subject:Kafka offset management > > Hi list, > > I was playing around with KafkaIO today to understand how it behaves in a > failure or restart scenario (be it crash, cancel, or drain), and I found it > "lost" (or skipped) Kafka messages in these cases. That is, it resumed from > the latest offsets rather than the last successfully processed offsets, and > a gap in messages was observed as a result. > > My KafkaIO transform looks like: > > KafkaIO. readBytes() > .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) > .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) > .withTopics(ImmutableList.of(KAFKA_TOPIC)) > > The consumer factory is used because of some runtime SSL key/truststore > setup: > > final Map config = Maps.newHashMap(); > config.put("auto.offset.reset", "latest"); > config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); > config.put("group.id", KAFKA_GROUP_ID); > config.put("key.deserializer", ByteArrayDeserializer.class. > getName()); > config.put("security.protocol", "SSL"); > config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); > config.put("ssl.truststore.location", > truststore.toPath().toString()); > config.put("ssl.truststore.password", > kafkaTruststorePassword); > config.put("value.deserializer", ByteArrayDeserializer.class. > getName()); > > return new KafkaConsumer<>(config); > > So, I am setting a group.id, and I know KafkaIO is using the new consumer > because our Zookeeper is not accessible from Dataflow. > > When I look on the Kafka cluster, there is no record of the consumer > group's offsets. So I take it KafkaIO is not committing offsets to Kafka. > > Can this be changed to commit offsets when a "batch" of streaming messages > are seen as processed OK? I am fine with at-least-once. > > I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 > > Cheers, > Gwilym > > >
Re: Kafka offset management
Hi GwilymKafkaIO uses the save offset to the snapshot (checkpoint) instead of commit offsets to Kafka for restarting.You can use a kafka client configuration to open auto commit of offsets.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = trueHope this helps. Best, JingsongLee --From:Gwilym EvansTime:2017 Jun 29 (Thu) 15:32To:user Subject:Kafka offset management Hi list, I was playing around with KafkaIO today to understand how it behaves in a failure or restart scenario (be it crash, cancel, or drain), and I found it "lost" (or skipped) Kafka messages in these cases. That is, it resumed from the latest offsets rather than the last successfully processed offsets, and a gap in messages was observed as a result. My KafkaIO transform looks like: KafkaIO. readBytes() .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) .withTopics(ImmutableList.of(KAFKA_TOPIC)) The consumer factory is used because of some runtime SSL key/truststore setup: final Map config = Maps.newHashMap(); config.put("auto.offset.reset", "latest"); config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); config.put("group.id", KAFKA_GROUP_ID); config.put("key.deserializer", ByteArrayDeserializer.class.getName()); config.put("security.protocol", "SSL"); config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); config.put("ssl.truststore.location", truststore.toPath().toString()); config.put("ssl.truststore.password", kafkaTruststorePassword); config.put("value.deserializer", ByteArrayDeserializer.class.getName()); return new KafkaConsumer<>(config); So, I am setting a group.id, and I know KafkaIO is using the new consumer because our Zookeeper is not accessible from Dataflow. When I look on the Kafka cluster, there is no record of the consumer group's offsets. So I take it KafkaIO is not committing offsets to Kafka. Can this be changed to commit offsets when a "batch" of streaming messages are seen as processed OK? I am fine with at-least-once. I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 Cheers,Gwilym
Re: Recommended Flink Version
As for creating a fat jar, I've encountered difficulty with this as well. When attempting to compile (with Maven, using the package command) the Beam master to a jar, it instead creates several separate jars in each subfolder. And without messing with the pom file, it fails because of missing dependencies. I've tried to get around this by altering the pom file to include all possible dependencies, but even after doing this, the job submission will still fail, citing some missing dependency. Any advice on how to create a working fat jar? Thanks,Will. On Thursday, June 29, 2017 1:28 AM, Jean-Baptiste Onofréwrote: Good point, fair enough. Regards JB On 06/29/2017 10:26 AM, Aljoscha Krettek wrote: > I think it’s a bug because if you start a Flink cluster on bare-metal it > works, just when it’s started in YARN it doesn’t. And I feel that the way you > start your cluster should not affect how you can submit jobs to it. > > Best, > Aljoscha > >> On 29. Jun 2017, at 10:15, Jean-Baptiste Onofré wrote: >> >> Yes, it's the same with the spark runner using bin/spark-submit. From my >> standpoint, it's not a bug, it's a feature request. >> >> Regards >> JB >> >> On 06/29/2017 10:12 AM, Aljoscha Krettek wrote: >>> I also responded to a separate mail by Will. The problem is that currently >>> we cannot submit a job using the remote client to a Flink cluster that was >>> started on YARN. (It’s a bug or “feature” of how communication with a Flink >>> cluster from a client works.) >>> The workaround for that is to use the bin/flink command to submit a Beam >>> fat-jar on a Flink YARN cluster. >>> Best, >>> Aljoscha On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré wrote: Hi Will, assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by default. So, I would recommend this version or 1.2.x. Regards JB On 06/28/2017 10:39 PM, Will Walters wrote: > Hello, > I've been attempting to run Beam through Flink on a Yarn cluster and have > run into trouble with getting a job to submit, partly because of > incompatibility between versions. Does anyone know what versions of Beam > and Flink I should be using to give myself the best chance of finding > compatibility? > Thank you, > Will. -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com
Re: Beam Slack channel
Invite sent. On Thu, 29 Jun 2017 at 6:07 PM Tolsa, Camillewrote: > Hello, > > > Could you also invite me at camille.to...@gmail.com thanks > > On 29 June 2017 at 09:29, Ismaël Mejía wrote: > >> Invitation sent! >> >> On Thu, Jun 29, 2017 at 9:16 AM, Patrick Reames >> wrote: >> > Can i also get an invite? >> > >> > On 2017-06-25 08:51 (-0500), Aleksandr wrote: >> >> Hello,> >> >> Can someone please add me to the slack channel?> >> >> >> >> Best regards> >> >> Aleksandr Gortujev.> >> >> >> > >> > > > > > > This e-mail transmission (message and any attached files) may contain > information that is proprietary, privileged and/or confidential to Veolia > Environnement and/or its affiliates and is intended exclusively for the > person(s) to whom it is addressed. If you are not the intended recipient, > please notify the sender by return e-mail and delete all copies of this > e-mail, including all attachments. Unless expressly authorized, any use, > disclosure, publication, retransmission or dissemination of this e-mail > and/or of its attachments is strictly prohibited. > > Ce message electronique et ses fichiers attaches sont strictement > confidentiels et peuvent contenir des elements dont Veolia Environnement > et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc > destines a l'usage de leurs seuls destinataires. Si vous avez recu ce > message par erreur, merci de le retourner a son emetteur et de le detruire > ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la > publication, la distribution, ou la reproduction non expressement > autorisees de ce message et de ses pieces attachees sont interdites. > > >
Re: Recommended Flink Version
I think it’s a bug because if you start a Flink cluster on bare-metal it works, just when it’s started in YARN it doesn’t. And I feel that the way you start your cluster should not affect how you can submit jobs to it. Best, Aljoscha > On 29. Jun 2017, at 10:15, Jean-Baptiste Onofréwrote: > > Yes, it's the same with the spark runner using bin/spark-submit. From my > standpoint, it's not a bug, it's a feature request. > > Regards > JB > > On 06/29/2017 10:12 AM, Aljoscha Krettek wrote: >> I also responded to a separate mail by Will. The problem is that currently >> we cannot submit a job using the remote client to a Flink cluster that was >> started on YARN. (It’s a bug or “feature” of how communication with a Flink >> cluster from a client works.) >> The workaround for that is to use the bin/flink command to submit a Beam >> fat-jar on a Flink YARN cluster. >> Best, >> Aljoscha >>> On 29. Jun 2017, at 07:23, Jean-Baptiste Onofré wrote: >>> >>> Hi Will, >>> >>> assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by >>> default. So, I would recommend this version or 1.2.x. >>> >>> Regards >>> JB >>> >>> On 06/28/2017 10:39 PM, Will Walters wrote: Hello, I've been attempting to run Beam through Flink on a Yarn cluster and have run into trouble with getting a job to submit, partly because of incompatibility between versions. Does anyone know what versions of Beam and Flink I should be using to give myself the best chance of finding compatibility? Thank you, Will. >>> >>> -- >>> Jean-Baptiste Onofré >>> jbono...@apache.org >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com
Re: Recommended Flink Version
Yes, it's the same with the spark runner using bin/spark-submit. From my standpoint, it's not a bug, it's a feature request. Regards JB On 06/29/2017 10:12 AM, Aljoscha Krettek wrote: I also responded to a separate mail by Will. The problem is that currently we cannot submit a job using the remote client to a Flink cluster that was started on YARN. (It’s a bug or “feature” of how communication with a Flink cluster from a client works.) The workaround for that is to use the bin/flink command to submit a Beam fat-jar on a Flink YARN cluster. Best, Aljoscha On 29. Jun 2017, at 07:23, Jean-Baptiste Onofréwrote: Hi Will, assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by default. So, I would recommend this version or 1.2.x. Regards JB On 06/28/2017 10:39 PM, Will Walters wrote: Hello, I've been attempting to run Beam through Flink on a Yarn cluster and have run into trouble with getting a job to submit, partly because of incompatibility between versions. Does anyone know what versions of Beam and Flink I should be using to give myself the best chance of finding compatibility? Thank you, Will. -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com
Re: Recommended Flink Version
I also responded to a separate mail by Will. The problem is that currently we cannot submit a job using the remote client to a Flink cluster that was started on YARN. (It’s a bug or “feature” of how communication with a Flink cluster from a client works.) The workaround for that is to use the bin/flink command to submit a Beam fat-jar on a Flink YARN cluster. Best, Aljoscha > On 29. Jun 2017, at 07:23, Jean-Baptiste Onofréwrote: > > Hi Will, > > assuming you are using Beam 2.0.0, the Flink runner uses Flink 1.2.1 by > default. So, I would recommend this version or 1.2.x. > > Regards > JB > > On 06/28/2017 10:39 PM, Will Walters wrote: >> Hello, >> I've been attempting to run Beam through Flink on a Yarn cluster and have >> run into trouble with getting a job to submit, partly because of >> incompatibility between versions. Does anyone know what versions of Beam and >> Flink I should be using to give myself the best chance of finding >> compatibility? >> Thank you, >> Will. > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com
Kafka offset management
Hi list, I was playing around with KafkaIO today to understand how it behaves in a failure or restart scenario (be it crash, cancel, or drain), and I found it "lost" (or skipped) Kafka messages in these cases. That is, it resumed from the latest offsets rather than the last successfully processed offsets, and a gap in messages was observed as a result. My KafkaIO transform looks like: KafkaIO.readBytes() .withConsumerFactoryFn(new KafkaConsumerFactoryFn()) .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) .withTopics(ImmutableList.of(KAFKA_TOPIC)) The consumer factory is used because of some runtime SSL key/truststore setup: final Map config = Maps.newHashMap(); config.put("auto.offset.reset", "latest"); config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); config.put("group.id", KAFKA_GROUP_ID); config.put("key.deserializer", ByteArrayDeserializer.class.getName()); config.put("security.protocol", "SSL"); config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1"); config.put("ssl.truststore.location", truststore.toPath().toString()); config.put("ssl.truststore.password", kafkaTruststorePassword); config.put("value.deserializer", ByteArrayDeserializer.class.getName()); return new KafkaConsumer<>(config); So, I am setting a group.id, and I know KafkaIO is using the new consumer because our Zookeeper is not accessible from Dataflow. When I look on the Kafka cluster, there is no record of the consumer group's offsets. So I take it KafkaIO is not committing offsets to Kafka. Can this be changed to commit offsets when a "batch" of streaming messages are seen as processed OK? I am fine with at-least-once. I am using Dataflow, Beam 2.0, Kafka 0.10.2.1 Cheers, Gwilym
Re: Beam Slack channel
Invitation sent! On Thu, Jun 29, 2017 at 9:16 AM, Patrick Reameswrote: > Can i also get an invite? > > On 2017-06-25 08:51 (-0500), Aleksandr wrote: >> Hello,> >> Can someone please add me to the slack channel?> >> >> Best regards> >> Aleksandr Gortujev.> >> >