ValueProviderOptions and templates

2020-06-16 Thread Marco Mistroni
HI all
 i am creating dataflow jobs using python API by creating templates which i
then run on gcp.
So suppose my dataflow job accepts 2 input parameter which i need to supply
at invocation time.
Do i need to specify these parameters when i create my template?
Here' s a sample. suppose i need two parameters
--year   and  --key
Which of the two is the correct syntax for creating a template for the job?

python -m my_main  --runner=dataflow --project=xxx-projects
 --template_location=gs://mm_dataflow_bucket/templates/mytemplate
--temp_location=gs://mm_dataflow_bucket/temp
--staging_location=gs://mm_dataflow_bucket/staging  --setup ./setup.py

OR
python -m my_main  --runner=dataflow --project=xxx-projects
 --template_location=gs://mm_dataflow_bucket/templates/mytemplate
--temp_location=gs://mm_dataflow_bucket/temp
--staging_location=gs://mm_dataflow_bucket/staging  --setup ./setup.py
--year  --key

My hope with the latter is that the template 'sees' the option and replace
it with the correct values when i actually invoke the template
regards
 Marco


Re: unable to read kerberized HDFS using dataflow

2020-06-16 Thread Luke Cwik
Posted comments on your SO question.

On Tue, Jun 16, 2020 at 4:32 AM Vince Gonzalez 
wrote:

> Is there specific configuration required to ensure that workers get access
> to UserGroupInformation when using TextIO? I am using Beam 2.22.0 on the
> dataflow runner.
>
> My main method looks like this below. My HdfsTextIOOptions extends
> HadoopFileSystemOptions, and I set the HdfsConfiguration on the options
> instance. I am using a keytab to authenticate. I'm not sure whether
> using UserGroupInformation.setConfiguration() is sufficient to ensure the
> UGI makes it to all the workers. My pipeline fails with this exception:
>
> Error message from worker: org.apache.hadoop.security.AccessControlException: 
> SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
>
>
>   public static void main(String[] args) throws IOException {
> System.setProperty("java.security.krb5.realm", "MY_REALM");
> System.setProperty("java.security.krb5.kdc", "my.kdc.hostname");
>
> HdfsTextIOOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(
> HdfsTextIOOptions.class);
>
> Storage storage = StorageOptions.getDefaultInstance().getService();
> URI uri = URI.create(options.getGcsKeytabPath());
> System.err.println(String
> .format("URI: %s, filesystem: %s, bucket: %s, filename: %s", 
> uri.toString(),
> uri.getScheme(), uri.getAuthority(),
> uri.getPath()));
> Blob keytabBlob = storage.get(BlobId.of(uri.getAuthority(),
> uri.getPath().startsWith("/") ? uri.getPath().substring(1) : 
> uri.getPath()));
> Path localKeytabPath = Paths.get("/tmp", uri.getPath());
> System.err.println(localKeytabPath);
>
> keytabBlob.downloadTo(localKeytabPath);
>
> Configuration conf = new Configuration();
> conf.set("fs.defaultFS", "hdfs://namenode:8020");
> conf.set("hadoop.security.authentication", "kerberos");
>
> UserGroupInformation
> .loginUserFromKeytab(options.getUserPrincipal(), 
> localKeytabPath.toString());
> UserGroupInformation.setConfiguration(conf);
>
> options.setHdfsConfiguration(ImmutableList.of(conf));
>
> Pipeline p = Pipeline.create(options);
>
> p.apply(TextIO.read().from(options.getInputFile()))
> ...
>
> I also posted to stackoverflow:
> https://stackoverflow.com/questions/62397379/google-cloud-dataflow-textio-and-kerberized-hdfs
>
> Thanks for any leads!
>
> --vince
>
>


Preparing to sunset Python 2 offering in Apache Beam

2020-06-16 Thread Valentyn Tymofieiev
Hi Beam User community,

In line with the pledge[1] to sunset Python 2 offering in new releases in
2020 that Apache Beam has committed to [1,2], we are discussing[3] a more
concrete proposal on dev@ mailing list to make 2.23.0 the final Beam
release supporting Python 2.

We have now had more than 10 releases with Python 3 support, and are not
aware of any hard blockers in Beam that prevent users from Py3 migration.
If you are experiencing issues with Py3 migration, we'd like to hear from
you.

Thanks,
Valentyn

[1]
https://lists.apache.org/thread.html/634f7346b607e779622d0437ed0eca783f474dea8976adf41556845b%40%3Cdev.beam.apache.org%3E
[2] http://python3statement.org/
[3]
https://lists.apache.org/thread.html/rc76d352f4651e81fa20b91de090301761699c90753927a7240c775e6%40%3Cdev.beam.apache.org%3E


Webinar: Feature Powered by Apache Beam – Beyond Lambda (eBay)

2020-06-16 Thread Aizhamal Nurmamat kyzy
Hi all,

We are resuming our webinars on Beam Learning Month!

Please join us this Wednesday at *9.45am PDT/4:45pm GMT/12:45pm EST* ,
where Kobe Feng from eBay will deliver a talk about leveraging Apache Beam
to build large scale feature pipelines at eBay.

Register: https://learn.xnextcon.com/event/eventdetails/W20060310


KafkaIO Exactly once vs At least Once

2020-06-16 Thread Eleanore Jin
Hi All,

I previously asked a few questions regarding enable EOS (exactly once
semantics) please see below.

Our Beam pipeline uses KafkaIO to read from source topic, and then use
KafkaIO to publish to sink topic.

According to Max's answer to my previous questions, enable EOS with KafkaIO
will introduce latency,
as only after checkpoints of all message within the checkpoint interval,
then the KakfaIO.ExactlyOnceWriter
processElement method will be called. So the latency depends on the
checkpoint interval.

I just wonder if I relax to At Least Once, do I still need to enable EOS on
KafkaIO? Or it is not required?
If not, can you please provide some instruction how should it be done?

Thanks a lot!
Eleanore

> Thanks for the response! the reason to setup the state backend is to
> experiment Kafka EOS with Beam running on Flink.  Reading through the
> code and this PR , can
> you please help me clarify my understanding?
>
> 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> EOS, ExactlyOnceWriter processElement method is annotated
> with @RequiresStableInput, so all the messages will be cached
> by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> messages will be processed by ExactlyOnceWriter?

That's correct.

>
> 2. Upon checkpoint, will those messages cached by
> KeyedBufferingEleementsHandler also checkpointed?

Yes, the buffered elements will be checkpointed.

> 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> stream processing, the delay is based on the checkpoint interval? How to
> reduce the latency while still have EOS guarantee?

Indeed, the checkpoint interval and the checkpoint duration limits the
latency. Given the current design and the guarantees, there is no other
way to influence the latency.

> 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> checkpoint successfully, the checkpointed offset will be committed back
> to kafka, but if this operation does not finish successfully, and then
> the job gets cancelled/stopped, and re-submit the job again (with the
> same consumer group for source topics, but different jobID), then it is
> possible duplicated processing still exists? because the consumed offset
> is not committed back to kafka?

This option is for the Kafka consumer. AFAIK this is just a convenience
method to commit the latest checkpointed offset to Kafka. This offset is
not used when restoring from a checkpoint. However, if you don't restore
from a checkpoint, you can resume from that offset which might be
convenient or not, depending on your use case.


Re: Beam supports Flink RichAsyncFunction

2020-06-16 Thread Eleanore Jin
Thanks Luke for the info. I will take a look.

Eleanore

On Mon, Jun 15, 2020 at 12:48 PM Luke Cwik  wrote:

> The intent is that users shouldn't have to use async I/O since the idea is
> that the runner should increase the number of workers/threads being
> processed automatically so that you never need to special case this.
> Unfortunately Dataflow is the only one who does this today so you are
> forced to use something like GroupIntoBatches[1] to gather input elements
> that you convert into requests you want to send and manage your own threads
> / completion.
>
> 1:
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>
> On Sun, Jun 14, 2020 at 7:21 PM Eleanore Jin 
> wrote:
>
>> Hi Community,
>>
>> I am trying to convert an existing Flink job into Beam pipeline. In the
>> current Flink job, we have async I/O operator (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html)
>> which extends RichAsyncFunction
>> 
>> .
>>
>> I did not find any document online for Beam to support this, if it is
>> documented somewhere, can you please point to me?
>>
>> In case Beam does not support it, is there any suggested 'work around'
>> for it?
>>
>> Thanks a lot!
>> Eleanore
>>
>


unable to read kerberized HDFS using dataflow

2020-06-16 Thread Vince Gonzalez
Is there specific configuration required to ensure that workers get access
to UserGroupInformation when using TextIO? I am using Beam 2.22.0 on the
dataflow runner.

My main method looks like this below. My HdfsTextIOOptions extends
HadoopFileSystemOptions, and I set the HdfsConfiguration on the options
instance. I am using a keytab to authenticate. I'm not sure whether
using UserGroupInformation.setConfiguration() is sufficient to ensure the
UGI makes it to all the workers. My pipeline fails with this exception:

Error message from worker:
org.apache.hadoop.security.AccessControlException: SIMPLE
authentication is not enabled. Available:[TOKEN, KERBEROS]


  public static void main(String[] args) throws IOException {
System.setProperty("java.security.krb5.realm", "MY_REALM");
System.setProperty("java.security.krb5.kdc", "my.kdc.hostname");

HdfsTextIOOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(
HdfsTextIOOptions.class);

Storage storage = StorageOptions.getDefaultInstance().getService();
URI uri = URI.create(options.getGcsKeytabPath());
System.err.println(String
.format("URI: %s, filesystem: %s, bucket: %s, filename: %s",
uri.toString(),
uri.getScheme(), uri.getAuthority(),
uri.getPath()));
Blob keytabBlob = storage.get(BlobId.of(uri.getAuthority(),
uri.getPath().startsWith("/") ? uri.getPath().substring(1) :
uri.getPath()));
Path localKeytabPath = Paths.get("/tmp", uri.getPath());
System.err.println(localKeytabPath);

keytabBlob.downloadTo(localKeytabPath);

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://namenode:8020");
conf.set("hadoop.security.authentication", "kerberos");

UserGroupInformation
.loginUserFromKeytab(options.getUserPrincipal(),
localKeytabPath.toString());
UserGroupInformation.setConfiguration(conf);

options.setHdfsConfiguration(ImmutableList.of(conf));

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from(options.getInputFile()))
...

I also posted to stackoverflow:
https://stackoverflow.com/questions/62397379/google-cloud-dataflow-textio-and-kerberized-hdfs

Thanks for any leads!

--vince