Re: [PROPOSAL] Preparing for Beam 2.28.0 release

2021-02-03 Thread Chamikara Jayalath
Update: we are down to a single blocker and a fix is in review.

Thanks,
Cham

On Fri, Jan 29, 2021 at 5:51 PM Chamikara Jayalath 
wrote:

> Update:
>
> We have two remaining blockers: https://s.apache.org/beam-2.28.0-burn-down
> I hope to build the first RC when the blockers are resolved.
>
> Thanks,
> Cham
>
>
> On Wed, Jan 27, 2021 at 12:51 PM Chamikara Jayalath 
> wrote:
>
>> Release branch was cut:
>> https://github.com/apache/beam/tree/release-2.28.0
>>
>> I'll work with owners of current release blocking JIRAs to cherry-pick
>> fixes before building a RC.
>>
>> Thanks,
>> Cham
>>
>> On Mon, Jan 25, 2021 at 5:20 PM Chamikara Jayalath 
>> wrote:
>>
>>> Hi All,
>>>
>>> I hope to cut the 2.28.0 release branch on 01/27/2021 as scheduled.
>>>
>>> Please see [1] for the current JIRA burn down for Beam 2.28.0 release.
>>>
>>> If you own any issue in that list, please try to resolve them as soon as
>>> possible (or remove from the list if not critical for this release).
>>>
>>> If you find any new issues that require a critical fix in the Beam
>>> 2.28.0 release, please create a JIRA and add to the burn down list by
>>> changing the Fix Version to 2.28.0.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1] https://s.apache.org/beam-2.28.0-burn-down
>>>
>>>
>>>
>>> On Thu, Jan 14, 2021 at 4:41 PM Ankur Goenka  wrote:
>>>
 Thanks Cham!

 On Thu, Jan 14, 2021 at 12:26 PM Yichi Zhang  wrote:

> Thank you, Cham!
>
> On Wed, Jan 13, 2021 at 12:58 PM Ahmet Altay  wrote:
>
>> Thank you Cham!
>>
>> On Wed, Jan 13, 2021 at 12:44 PM Rui Wang  wrote:
>>
>>> Thanks Cham for working on this!
>>>
>>>
>>> -Rui
>>>
>>> On Wed, Jan 13, 2021 at 11:32 AM Kyle Weaver 
>>> wrote:
>>>
 Thanks for stepping up Cham!

 Remember to mark critical JIRA issues as release blockers everybody!

 On Wed, Jan 13, 2021 at 11:25 AM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Hi All,
>
> Beam 2.28.0 release is scheduled to be cut on January 27th
> according to the release calendar [1]
>
> I'd like to volunteer myself to be the release manager for this
> release. I plan on cutting the release branch on the scheduled date.
>
> Any comments or objections ?
>
> Thanks,
> Cham
>
> [1]
> https://calendar.google.com/calendar/u/0/embed?src=0p73sl034k80oob7seouani...@group.calendar.google.com=America/Los_Angeles
>



Re: Environment options for external transforms

2021-02-03 Thread Chamikara Jayalath
On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver  wrote:

> Hi Beamers,
>
> Recently we’ve had some requests on user@ and Slack for instructions on
> how to use custom-built containers in cross-language pipelines (typically
> calling Java transforms from a predominantly Python pipeline). Currently,
> it seems like there is no way to change the container used by a
> cross-language transform except by modifying and rebuilding the expansion
> service. The SDK does not pass pipeline options to the expansion service
> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
> if pipeline options are passed, the existing set of pipeline options still
> limits the amount of control we have over environments. Here are the
> existing pipeline options that I’m aware of:
>
> Python [2] and Go [3] have these:
>
>-
>
>environment_type (DOCKER, PROCESS, LOOPBACK)
>-
>
>environment_config (This one is confusingly overloaded. It’s a string
>that means different things depending on environment_type. For DOCKER, it
>is the Docker image URL. For PROCESS it is a JSON blob. For EXTERNAL, it is
>the external service address.)
>
>
> Whereas Java [4] has defaultEnvironmentType and defaultEnvironmentConfig,
> which are named differently but otherwise act the same as the above.
>
> I was unsatisfied with environment_config for a number of reasons. First,
> having a single overloaded option that can mean entirely different things
> depending on context is poor design. Second, in PROCESS mode, requiring the
> user to type in a JSON blob for environment_config is not especially
> human-friendly (though it has also been argued that JSON makes complex
> arguments like this easier to parse). Finally, we must overload this string
> further to introduce new environment-specific options, such as a mounted
> Docker volume (BEAM-5440 [5]).
>

Agree.


>
> To address these problems, I added a new option called
> “environment_options” (BEAM-10671 [6]). (This option has been implemented
> in the Python SDK, but not the other SDKs yet.) Environment_options,
> similar to the “experiments” option, takes a list of strings, for example
> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
> be argued we should have made “docker_container_image” etc. top-level
> options instead, but this “catch-all” design makes what I am about to
> propose a lot easier.
>
> The solution proposed in PR #11638 [7] set a flag to include unrecognized
> pipeline options during serialization, since otherwise unrecognized options
> are dropped. In a Python pipeline, this will allow us to set
> environment_config and default_environment_config to separate values, for
> Python and Java containers, respectively. However, this still limits us to
> one container image for all Python and Go transforms, and one container
> image for all Java transforms. As more cross-language transforms are
> implemented, sooner or later someone will want to have different Java SDK
> containers for different external transforms.
>
> (I should also mention the sdk_harness_container_image_overrides pipeline
> option [8], which is currently only supported by the Dataflow runner. It
> lets us basically perform a find/replace on container image strings. This
> is not significantly more flexible than having a single option per SDK,
> since the default container images for all external transforms in each SDK
> are expected to be the same.)
>
> Environments logically belong with transforms, and that’s how it works in
> the Runner API [9]. The problem now is that from the user’s perspective,
> the environment is bound to the expansion service. After addressing
> BEAM-9449, the problem will be that one or two environments at most are
> bound to the pipeline. Ideally, though, users should have fully granular
> control over environments at the transform level.
>
> All this context for a very simple proposal: we should have all
> ExternalTransform subclasses take optional environment_type and
> environment_options fields in their constructors. As with their
> corresponding pipeline options, these options would default to DOCKER and
> none, respectively. Then we could overwrite the environment_type and
> environment_options in the pipeline options passed to the expansion service
> with these values. (Alternatively, we could pass environment_type and
> environment_options to the expansion service individually to avoid having
> to overwrite their original values, but their original values should be
> irrelevant to the expansion service anyway.)
>
> What do you think?
>

So, an external transform is uniquely identified by its URN. An external
transform identified by a URN may refer to an arbitrary composite which may
have sub-transforms that refer to different environments. I think with the
above proposal we'll lose this flexibility.
What we need is a way to override environments (or properties of
environments) that results in the final pipeline 

Environment options for external transforms

2021-02-03 Thread Kyle Weaver
Hi Beamers,

Recently we’ve had some requests on user@ and Slack for instructions on how
to use custom-built containers in cross-language pipelines (typically
calling Java transforms from a predominantly Python pipeline). Currently,
it seems like there is no way to change the container used by a
cross-language transform except by modifying and rebuilding the expansion
service. The SDK does not pass pipeline options to the expansion service
(BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
if pipeline options are passed, the existing set of pipeline options still
limits the amount of control we have over environments. Here are the
existing pipeline options that I’m aware of:

Python [2] and Go [3] have these:

   -

   environment_type (DOCKER, PROCESS, LOOPBACK)
   -

   environment_config (This one is confusingly overloaded. It’s a string
   that means different things depending on environment_type. For DOCKER, it
   is the Docker image URL. For PROCESS it is a JSON blob. For EXTERNAL, it is
   the external service address.)


Whereas Java [4] has defaultEnvironmentType and defaultEnvironmentConfig,
which are named differently but otherwise act the same as the above.

I was unsatisfied with environment_config for a number of reasons. First,
having a single overloaded option that can mean entirely different things
depending on context is poor design. Second, in PROCESS mode, requiring the
user to type in a JSON blob for environment_config is not especially
human-friendly (though it has also been argued that JSON makes complex
arguments like this easier to parse). Finally, we must overload this string
further to introduce new environment-specific options, such as a mounted
Docker volume (BEAM-5440 [5]).

To address these problems, I added a new option called
“environment_options” (BEAM-10671 [6]). (This option has been implemented
in the Python SDK, but not the other SDKs yet.) Environment_options,
similar to the “experiments” option, takes a list of strings, for example
“--environment_option=docker_container_image=my_beam_sdk:latest”. It could
be argued we should have made “docker_container_image” etc. top-level
options instead, but this “catch-all” design makes what I am about to
propose a lot easier.

The solution proposed in PR #11638 [7] set a flag to include unrecognized
pipeline options during serialization, since otherwise unrecognized options
are dropped. In a Python pipeline, this will allow us to set
environment_config and default_environment_config to separate values, for
Python and Java containers, respectively. However, this still limits us to
one container image for all Python and Go transforms, and one container
image for all Java transforms. As more cross-language transforms are
implemented, sooner or later someone will want to have different Java SDK
containers for different external transforms.

(I should also mention the sdk_harness_container_image_overrides pipeline
option [8], which is currently only supported by the Dataflow runner. It
lets us basically perform a find/replace on container image strings. This
is not significantly more flexible than having a single option per SDK,
since the default container images for all external transforms in each SDK
are expected to be the same.)

Environments logically belong with transforms, and that’s how it works in
the Runner API [9]. The problem now is that from the user’s perspective,
the environment is bound to the expansion service. After addressing
BEAM-9449, the problem will be that one or two environments at most are
bound to the pipeline. Ideally, though, users should have fully granular
control over environments at the transform level.

All this context for a very simple proposal: we should have all
ExternalTransform subclasses take optional environment_type and
environment_options fields in their constructors. As with their
corresponding pipeline options, these options would default to DOCKER and
none, respectively. Then we could overwrite the environment_type and
environment_options in the pipeline options passed to the expansion service
with these values. (Alternatively, we could pass environment_type and
environment_options to the expansion service individually to avoid having
to overwrite their original values, but their original values should be
irrelevant to the expansion service anyway.)

What do you think?

[1] https://issues.apache.org/jira/browse/BEAM-9449

[2]
https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115

[3]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53

[4]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71

[5] https://issues.apache.org/jira/browse/BEAM-5440

[6] 

[Proposal] Portable OrderedListState

2021-02-03 Thread Boyuan Zhang
Hi team,

I'm working on supporting OrderedListState over fnapi and I'm starting the
design proposal here: doc

.

This doc focuses on fnapi proto changes and Java SDK harness support.
Please feel free to drop any ideas/concerns/suggestions there. If the
design looks good, I'll start to work on released code changes.

Thanks for your help!


Re: Upgrading non-vendored Guava version

2021-02-03 Thread Tomo Suzuki
The PR has moved to https://github.com/apache/beam/pull/13804, after
reflecting Kyle's feedback. Kyle approved this.

I look for feedback, and merge if there's no problem.

Regards,
Tomo

On Tue, Jan 19, 2021 at 6:10 PM Ahmet Altay  wrote:

> +Kenneth Knowles  +Tyson Hamilton  +Kiley
> Sok 
>
> On Thu, Jan 14, 2021 at 4:23 PM Tomo Suzuki  wrote:
>
>> Hi Beam developers,
>>
>> I'm preparing a pull request (#13740)
>>  that upgrades the
>> non-vendored Guava version (currently 25.1-jre) to the latest version
>> (30.1-jre). I want Beam to be built, tested, and run with the newer version
>> of Guava so that its Guava dependency works with other libraries that
>> depend on newer Guava (e.g., gcsio).
>>
>> However, certain Hadoop/Cassandra-related modules in the Beam project
>> require Guava 25 or older (Details in BEAM-11626
>> ) when they run tests.
>> Therefore, this PR attempts to split guava versions: 25
>> for Hadoop/Cassandra-related modules and 30 for the rest of the project.
>>
>> I analyzed the effect of the change in the PR below. So far I see the
>> effect is minimal thanks to the fact that Beam vendors Guava and only 3
>> modules declares non-test dependency to the non-vendored Guava.
>>
>> https://github.com/apache/beam/pull/13740
>>
>> As I am not a Hadoop/Cassandra user, I might have missed
>> important points. I appreciate it if you can share your perspective on this.
>>
>> --
>> Regards,
>> Tomo
>>
>

-- 
Regards,
Tomo


Re: How to create an emty file using apche beam

2021-02-03 Thread Robert Bradshaw
You could write a DoFn that "consumes" the output of write as a side input
and touches the file manually. E.g.

write_result = ... | beam.io.WriteToText(...)
p | beam.Create([None]) | beam.Map(lambda unused_none, unused_side:
create_file(), unused_side=write_result)

where create_file() actually creates the file in question. Though
write_result is unused, this will cause the Map to block until it is
computed.


On Wed, Feb 3, 2021 at 9:15 AM radhika sharma  wrote:

> I have created a data flow template as below
>
> from __future__ import absolute_import
> import apache_beam as beam
> import argparse
> import logging
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.gcp.internal.clients import bigquery
> from datetime import date
> today = date.today()
> current_date = today.strftime("%Y%m%d")
> def run(argv=None):
> parser = argparse.ArgumentParser()
> known_args, pipeline_args = parser.parse_known_args(argv)
> p = beam.Pipeline(options=PipelineOptions(pipeline_args))
> (p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT
> DISTINCT(IF(LENGTH(MOBILE)=10, CONCAT('91',MOBILE),REPLACE(MOBILE,'+91
> ','91'))) FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW`
> WHERE REGEXP_CONTAINS(REGEXP_REPLACE(Mobile, ' ',
> ''),r'^(?:(?:\+|0{0,2})91(\s*[\-]\s*)?|[0]?)?[6789]\d{9}$')",use_standard_sql=True))
>| 'read values' >> beam.Map(lambda x: x.values())
>| 'CSV format' >> beam.Map(lambda row:'|'.join ("WRPOOL|5667788|"+
> str(column) +'|"'+"Hi, This msg is from Whirlpool DL" + '"' for column in
> row))
>| 'Write_to_GCS' >>
> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+
> str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG')
> p.run().wait_until_finish()
> if __name__ == '__main__':
> logging.getLogger().setLevel(logging.INFO)
> run()
>
> I need to create an emoty file after csv file is created. Not sure which
> option to use. Can some one help??
>
> Please help. Its urgent.
>
> I have tried
> beam.Create('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst.done')
> to create empty file.
> Doesn't work.
>
>


How to create an emty file using apche beam

2021-02-03 Thread radhika sharma
I have created a data flow template as below

from __future__ import absolute_import
import apache_beam as beam
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from datetime import date
today = date.today()
current_date = today.strftime("%Y%m%d")
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT 
DISTINCT(IF(LENGTH(MOBILE)=10, CONCAT('91',MOBILE),REPLACE(MOBILE,'+91 
','91'))) FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW` WHERE 
REGEXP_CONTAINS(REGEXP_REPLACE(Mobile, ' ', 
''),r'^(?:(?:\+|0{0,2})91(\s*[\-]\s*)?|[0]?)?[6789]\d{9}$')",use_standard_sql=True))
   | 'read values' >> beam.Map(lambda x: x.values())
   | 'CSV format' >> beam.Map(lambda row:'|'.join ("WRPOOL|5667788|"+ 
str(column) +'|"'+"Hi, This msg is from Whirlpool DL" + '"' for column in row))
   | 'Write_to_GCS' >> 
beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+
 
str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG')
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

I need to create an emoty file after csv file is created. Not sure which option 
to use. Can some one help??

Please help. Its urgent. 

I have tried 
beam.Create('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst.done')
 to create empty file.
Doesn't work. 



Re: Migrating Gradle scripts to Kotlin

2021-02-03 Thread Ramazan Yapparov
Yes, that's me, thank you!


From: Kenneth Knowles 
Sent: Tuesday, February 2, 2021 9:23:45 PM
To: dev
Subject: Re: Migrating Gradle scripts to Kotlin

I took a guess that the Jira user 'ramazan-yapparov' was you and have added you 
to the project and assigned the ticket. Let me know if this is not you.

Kenn

On Thu, Jan 28, 2021 at 2:26 PM Brian Hulette 
mailto:bhule...@google.com>> wrote:
I think generally smaller PRs are preferable if the work is easily separable, 
which it sounds like this is. I'd recommend you go ahead and put up a PR, you 
might even split it into one PR per file if that makes sense to you.

Side note - do you have a Jira ID? We should get BEAM-11589 assigned to you if 
you are working on it :)

Brian

On Thu, Jan 28, 2021 at 2:52 AM Ramazan Yapparov 
mailto:ramazan.yappa...@akvelon.com>> wrote:

Hi Beam Dev!
I started to work on this issue 
https://issues.apache.org/jira/browse/BEAM-11589 - migrating Gradle scripts 
from Groovy to Kotlin.
Currently, I've migrated settings.gradle, buildSrc/build.gradle and 
release/build.gradle files to Kotlin.
I believe that this work should be done incrementally, so I wanted to ask if 
this amount of changes is enough for a PR or should I migrate more modules?