Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-16 Thread Luke Cwik via dev
I upgraded the docker version on Jenkins workers and the tests passed.
(also installed Python 3.11 so we are ready for that)

On Tue, Feb 14, 2023 at 3:21 PM Kenneth Knowles  wrote:

> SGTM. I asked on the PR if this could impact users, but having read the
> docker release calendar I am not concerned. The last update to the old
> version was in 2019, and the introduction of compatible versions was 2020.
>
> On Tue, Feb 14, 2023 at 3:01 PM Byron Ellis via user 
> wrote:
>
>> FWIW I am Team Upgrade Docker :-)
>>
>> On Tue, Feb 14, 2023 at 2:53 PM Luke Cwik via user 
>> wrote:
>>
>>> I made some progress in testing the container and did hit an issue where
>>> Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
>>> turns out that our boot.go crashes with "runtime/cgo: pthread_create
>>> failed: Operation not permitted" because the Ubuntu 22.04 is using new
>>> syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
>>> default of deny). We have a couple of choices here:
>>> 1) upgrade the version of docker on Jenkins and require users to
>>> similarly use a new enough version of Docker so that this isn't an issue
>>> for them
>>> 2) use Ubuntu 20.04 "Focal" as the docker container
>>>
>>> I was using Docker 20.10.21 which is why I didn't hit this issue when
>>> testing the change locally.
>>>
>>> We could also do these but they same strictly worse then either of the
>>> two options discussed above:
>>> A) disable the seccomp policy on Jenkins
>>> B) use a custom seccomp policy on Jenkins
>>>
>>> My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu
>>> 22.04 as it will have LTS releases till 2027 and then security patches till
>>> 2032 which gives everyone the longest runway till we need to swap OS
>>> versions again for users of Apache Beam. Any concerns or ideas?
>>>
>>>
>>>
>>> On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:
>>>
>>>> Our current container java 8 container is 262 MiBs and layers on top of
>>>> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
>>>> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>>>>
>>>> I would rather not get into issues with C library differences caused by
>>>> the alpine project so I would stick with the safer option and let users
>>>> choose alpine when building their custom container if they feel it provides
>>>> a large win for them. We can always swap to alpine in the future as well if
>>>> the C library differences become a non-issue.
>>>>
>>>> So swapping to eclipse-temurin will save us a bunch on the container
>>>> size which should help with container transfer and hopefully for startup
>>>> times as well.
>>>>
>>>> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud 
>>>> wrote:
>>>>
>>>>> This sounds reasonable to me as well.
>>>>>
>>>>> I've made swaps like this in the past, the base image of each is
>>>>> probably a bigger factor than the JDK. The openjdk images were based on
>>>>> Debian 11. The default eclipse-temurin images are based on Ubuntu 22.04
>>>>> with an alpine option. Ubuntu is a Debian derivative but the versions and
>>>>> package names aren't exact matches and Ubuntu tends to update a little
>>>>> faster. For most users I don't think this will matter but users building
>>>>> custom containers may need to make minor changes. The alpine option will 
>>>>> be
>>>>> much smaller (which could be a significant improvement) but would be a 
>>>>> more
>>>>> significant change to the environment.
>>>>>
>>>>> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Seams reasonable to me.
>>>>>>
>>>>>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user <
>>>>>> u...@beam.apache.org> wrote:
>>>>>> >
>>>>>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses
>>>>>> have stopped being built and supported since July 2022. I have filed [2] 
>>>>>> to
>>>>>> track the resolution of this issue.
>>>>>> >
>>>>>&

Re: [ANNOUNCE] New PMC Member: Jan Lukavský

2023-02-16 Thread Luke Cwik via dev
Congrats, well deserved.

On Thu, Feb 16, 2023 at 10:32 AM Anand Inguva via dev 
wrote:

> Congratulations!!
>
> On Thu, Feb 16, 2023 at 12:42 PM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> Congrats Jan!
>>
>> On Thu, Feb 16, 2023 at 8:35 AM John Casey via dev 
>> wrote:
>>
>>> Thanks Jan!
>>>
>>> On Thu, Feb 16, 2023 at 11:11 AM Danny McCormick via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Congratulations!

 On Thu, Feb 16, 2023 at 11:09 AM Reza Rokni via dev <
 dev@beam.apache.org> wrote:

> Congratulations!
>
> On Thu, Feb 16, 2023 at 7:47 AM Robert Burke 
> wrote:
>
>> Congratulations!
>>
>> On Thu, Feb 16, 2023, 7:44 AM Danielle Syse via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Congrats, Jan! That's awesome news. Thank you for your continued
>>> contributions!
>>>
>>> On Thu, Feb 16, 2023 at 10:42 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 Hi all,

 Please join me and the rest of the Beam PMC in welcoming Jan
 Lukavský  as our newest PMC member.

 Jan has been a part of Beam community and a long time contributor
 since 2018 in many significant ways, including code contributions in
 different areas, participating in technical discussions, advocating for
 users, giving a talk at Beam Summit and even writing one of the few 
 Beam
 books!

 Congratulations Jan and thanks for being a part of Apache Beam!

 ---
 Alexey
>>>
>>>


Re: [VOTE] Release 2.45.0, Release Candidate #1

2023-02-16 Thread Luke Cwik via dev
All the PMC finalization tasks have been completed.

On Thu, Feb 16, 2023 at 8:56 AM Luke Cwik  wrote:

> I'll help out.
>
> On Thu, Feb 16, 2023 at 7:08 AM John Casey via dev 
> wrote:
>
>> Can a PMC member help me with PMC only release finalization?
>> https://beam.apache.org/contribute/release-guide/#pmc-only-finalization
>>
>> Thanks,
>> John
>>
>> On Wed, Feb 15, 2023 at 12:22 PM John Casey 
>> wrote:
>>
>>> With 7 approving votes, of which 5 are binding, 2.45.0 RC1 has been
>>> approved.
>>>
>>> Binding votes are:
>>>
>>> Luke Cwik
>>> Chamikara Jayalath
>>> Ahmet Altay
>>> Alexey Romanenko
>>> Robert Bradshaw
>>>
>>> There are no disapproving votes.
>>>
>>> Thanks for your validations everyone,
>>> John
>>>
>>> On Mon, Feb 13, 2023 at 5:19 PM Ritesh Ghorse via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> +1 (non-binding)
>>>> Validated:
>>>> 1. Go SDK Quickstart on Direct & Dataflow runner.
>>>> 2. Dataframe wrapper
>>>> 3. RunInference Wrapper for Sklearn
>>>>
>>>> On Mon, Feb 13, 2023 at 2:56 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> +1 (binding)
>>>>>
>>>>> Validated release artifacts and signatures and tested a couple of
>>>>> Python pipelines.
>>>>>
>>>>> On Mon, Feb 13, 2023 at 8:57 AM Alexey Romanenko
>>>>>  wrote:
>>>>> >
>>>>> > +1 (binding)
>>>>> >
>>>>> > Tested with  https://github.com/Talend/beam-samples/
>>>>> > (Java SDK v8/v11/v17, Spark 3.x runner).
>>>>> >
>>>>> > ---
>>>>> > Alexey
>>>>> >
>>>>> > On 13 Feb 2023, at 17:54, Ahmet Altay via dev 
>>>>> wrote:
>>>>> >
>>>>> > +1 (binding) - I validated python quick starts on direct runner and
>>>>> python streaming quickstart on dataflow.
>>>>> >
>>>>> > Thank you!
>>>>> >
>>>>> > On Mon, Feb 13, 2023 at 5:17 AM Bruno Volpato via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>> >>
>>>>> >> +1 (non-binding)
>>>>> >>
>>>>> >> Tested with
>>>>> https://github.com/GoogleCloudPlatform/DataflowTemplates (Java SDK
>>>>> 11, Dataflow runner).
>>>>> >>
>>>>> >>
>>>>> >> Thanks!
>>>>> >>
>>>>> >> On Mon, Feb 13, 2023 at 1:13 AM Chamikara Jayalath via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>> >>>
>>>>> >>> +1 (binding)
>>>>> >>>
>>>>> >>> Tried several Java and Python multi-language pipelines.
>>>>> >>>
>>>>> >>> Thanks,
>>>>> >>> Cham
>>>>> >>>
>>>>> >>> On Fri, Feb 10, 2023 at 1:52 PM Luke Cwik via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>> >>>>
>>>>> >>>> +1
>>>>> >>>>
>>>>> >>>> Validated release artifact signatures and verified the Java Flink
>>>>> and Spark quickstarts.
>>>>> >>>>
>>>>> >>>> On Fri, Feb 10, 2023 at 9:27 AM John Casey via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>> >>>>>
>>>>> >>>>> Addendum to above email.
>>>>> >>>>>
>>>>> >>>>> Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362
>>>>> >>>>>
>>>>> >>>>> On Fri, Feb 10, 2023 at 11:14 AM John Casey <
>>>>> theotherj...@google.com> wrote:
>>>>> >>>>>>
>>>>> >>>>>> Hi everyone,
>>>>> >>>>>> Please review and vote on the release candidate #3 for the
>>>>> version 2.45.0, as follows:
>>>>> >>>>>> [ ] +1, Approve the release
>>>>> >>>>>> [ ] -1, Do not approve the release (please provide specif

Re: [VOTE] Release 2.45.0, Release Candidate #1

2023-02-16 Thread Luke Cwik via dev
I'll help out.

On Thu, Feb 16, 2023 at 7:08 AM John Casey via dev 
wrote:

> Can a PMC member help me with PMC only release finalization?
> https://beam.apache.org/contribute/release-guide/#pmc-only-finalization
>
> Thanks,
> John
>
> On Wed, Feb 15, 2023 at 12:22 PM John Casey 
> wrote:
>
>> With 7 approving votes, of which 5 are binding, 2.45.0 RC1 has been
>> approved.
>>
>> Binding votes are:
>>
>> Luke Cwik
>> Chamikara Jayalath
>> Ahmet Altay
>> Alexey Romanenko
>> Robert Bradshaw
>>
>> There are no disapproving votes.
>>
>> Thanks for your validations everyone,
>> John
>>
>> On Mon, Feb 13, 2023 at 5:19 PM Ritesh Ghorse via dev <
>> dev@beam.apache.org> wrote:
>>
>>> +1 (non-binding)
>>> Validated:
>>> 1. Go SDK Quickstart on Direct & Dataflow runner.
>>> 2. Dataframe wrapper
>>> 3. RunInference Wrapper for Sklearn
>>>
>>> On Mon, Feb 13, 2023 at 2:56 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> +1 (binding)
>>>>
>>>> Validated release artifacts and signatures and tested a couple of
>>>> Python pipelines.
>>>>
>>>> On Mon, Feb 13, 2023 at 8:57 AM Alexey Romanenko
>>>>  wrote:
>>>> >
>>>> > +1 (binding)
>>>> >
>>>> > Tested with  https://github.com/Talend/beam-samples/
>>>> > (Java SDK v8/v11/v17, Spark 3.x runner).
>>>> >
>>>> > ---
>>>> > Alexey
>>>> >
>>>> > On 13 Feb 2023, at 17:54, Ahmet Altay via dev 
>>>> wrote:
>>>> >
>>>> > +1 (binding) - I validated python quick starts on direct runner and
>>>> python streaming quickstart on dataflow.
>>>> >
>>>> > Thank you!
>>>> >
>>>> > On Mon, Feb 13, 2023 at 5:17 AM Bruno Volpato via dev <
>>>> dev@beam.apache.org> wrote:
>>>> >>
>>>> >> +1 (non-binding)
>>>> >>
>>>> >> Tested with https://github.com/GoogleCloudPlatform/DataflowTemplates
>>>> (Java SDK 11, Dataflow runner).
>>>> >>
>>>> >>
>>>> >> Thanks!
>>>> >>
>>>> >> On Mon, Feb 13, 2023 at 1:13 AM Chamikara Jayalath via dev <
>>>> dev@beam.apache.org> wrote:
>>>> >>>
>>>> >>> +1 (binding)
>>>> >>>
>>>> >>> Tried several Java and Python multi-language pipelines.
>>>> >>>
>>>> >>> Thanks,
>>>> >>> Cham
>>>> >>>
>>>> >>> On Fri, Feb 10, 2023 at 1:52 PM Luke Cwik via dev <
>>>> dev@beam.apache.org> wrote:
>>>> >>>>
>>>> >>>> +1
>>>> >>>>
>>>> >>>> Validated release artifact signatures and verified the Java Flink
>>>> and Spark quickstarts.
>>>> >>>>
>>>> >>>> On Fri, Feb 10, 2023 at 9:27 AM John Casey via dev <
>>>> dev@beam.apache.org> wrote:
>>>> >>>>>
>>>> >>>>> Addendum to above email.
>>>> >>>>>
>>>> >>>>> Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362
>>>> >>>>>
>>>> >>>>> On Fri, Feb 10, 2023 at 11:14 AM John Casey <
>>>> theotherj...@google.com> wrote:
>>>> >>>>>>
>>>> >>>>>> Hi everyone,
>>>> >>>>>> Please review and vote on the release candidate #3 for the
>>>> version 2.45.0, as follows:
>>>> >>>>>> [ ] +1, Approve the release
>>>> >>>>>> [ ] -1, Do not approve the release (please provide specific
>>>> comments)
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> Reviewers are encouraged to test their own use cases with the
>>>> release candidate, and vote +1 if no issues are found.
>>>> >>>>>>
>>>> >>>>>> The complete staging area is available for your review, which
>>>> includes:
>>>> >>>>>> * GitHub Release notes [1],
>>>> >>>>&g

Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-14 Thread Luke Cwik via dev
I made some progress in testing the container and did hit an issue where
Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
turns out that our boot.go crashes with "runtime/cgo: pthread_create
failed: Operation not permitted" because the Ubuntu 22.04 is using new
syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
default of deny). We have a couple of choices here:
1) upgrade the version of docker on Jenkins and require users to similarly
use a new enough version of Docker so that this isn't an issue for them
2) use Ubuntu 20.04 "Focal" as the docker container

I was using Docker 20.10.21 which is why I didn't hit this issue when
testing the change locally.

We could also do these but they same strictly worse then either of the two
options discussed above:
A) disable the seccomp policy on Jenkins
B) use a custom seccomp policy on Jenkins

My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu 22.04
as it will have LTS releases till 2027 and then security patches till 2032
which gives everyone the longest runway till we need to swap OS versions
again for users of Apache Beam. Any concerns or ideas?



On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:

> Our current container java 8 container is 262 MiBs and layers on top of
> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>
> I would rather not get into issues with C library differences caused by
> the alpine project so I would stick with the safer option and let users
> choose alpine when building their custom container if they feel it provides
> a large win for them. We can always swap to alpine in the future as well if
> the C library differences become a non-issue.
>
> So swapping to eclipse-temurin will save us a bunch on the container size
> which should help with container transfer and hopefully for startup times
> as well.
>
> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud  wrote:
>
>> This sounds reasonable to me as well.
>>
>> I've made swaps like this in the past, the base image of each is probably
>> a bigger factor than the JDK. The openjdk images were based on Debian 11.
>> The default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
>> option. Ubuntu is a Debian derivative but the versions and package names
>> aren't exact matches and Ubuntu tends to update a little faster. For most
>> users I don't think this will matter but users building custom containers
>> may need to make minor changes. The alpine option will be much smaller
>> (which could be a significant improvement) but would be a more significant
>> change to the environment.
>>
>> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Seams reasonable to me.
>>>
>>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
>>> wrote:
>>> >
>>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
>>> stopped being built and supported since July 2022. I have filed [2] to
>>> track the resolution of this issue.
>>> >
>>> > Based upon [1], almost everyone is swapping to the eclipse-temurin
>>> container[3] as their base based upon the linked issues from the
>>> deprecation notice[1]. The eclipse-temurin container is released under
>>> these licenses:
>>> > Apache License, Version 2.0
>>> > Eclipse Distribution License 1.0 (BSD)
>>> > Eclipse Public License 2.0
>>> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
>>> Assembly Exception
>>> > 一 (Secondary) GNU General Public License, version 2 with the GNU
>>> Classpath Exception
>>> >
>>> > I propose that we swap all our containers to the eclipse-temurin
>>> containers[3].
>>> >
>>> > Open to other ideas and also would be great to hear about your
>>> experience in any other projects that you have had to make a similar
>>> decision.
>>> >
>>> > 1: https://github.com/docker-library/openjdk/issues/505
>>> > 2: https://github.com/apache/beam/issues/25371
>>> > 3: https://hub.docker.com/_/eclipse-temurin
>>>
>>


Re: A user-deployable Beam Transform Service

2023-02-10 Thread Luke Cwik via dev
Seems like a useful thing to me and will make it easier for Beam users
overall.

On Fri, Feb 10, 2023 at 3:56 PM Robert Bradshaw via dev 
wrote:

> Thanks. I added some comments to the doc.
>
> On Mon, Feb 6, 2023 at 1:33 PM Chamikara Jayalath via dev
>  wrote:
> >
> > Hi All,
> >
> > Beam PTransforms are currently primarily identified as operations in a
> pipeline that perform specific tasks. PTransform implementations were
> traditionally linked to specific Beam SDKs.
> >
> > With the advent of portability framework, multi-language pipelines, and
> expansion services that can be used to build/expand and discover
> transforms, we have an opportunity to make this more general and
> re-introduce Beam PTransforms as computation units that can serve any
> use-case that needs to discover or use Beam transforms. For example, any
> Beam SDK that runs a pipeline using a portable Beam runner should be able
> to use a transform offered through an expansion service irrespective of the
> implementation SDK of the transform or the pipeline.
> >
> > I believe we can make such use-cases much easier to manage by
> introducing a user-deployable service that encapsulates existing Beam
> expansion services in the form of a Kubernetes cluster. The service will
> offer a single gRPC endpoint and will include Beam expansion services
> developed in different languages. Any Beam pipeline, irrespective of the
> pipeline SDK, should be able to use any transform offered by the service.
> >
> > This will also offer a way to make multi-language pipeline execution,
> which currently relies on locally downloaded large dependencies and locally
> started expansion service processes, more robust.
> >
> > I have written a proposal for implementing such a service and it's
> available at https://s.apache.org/beam-transform-service.
> >
> > Please take a look and let me know if you have any comments or questions.
> >
> > Thanks,
> > Cham
>


Re: [VOTE] Release 2.45.0, Release Candidate #1

2023-02-10 Thread Luke Cwik via dev
+1

Validated release artifact signatures and verified the Java Flink and Spark
quickstarts.

On Fri, Feb 10, 2023 at 9:27 AM John Casey via dev 
wrote:

> Addendum to above email.
>
> Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362
>
> On Fri, Feb 10, 2023 at 11:14 AM John Casey 
> wrote:
>
>> Hi everyone,
>> Please review and vote on the release candidate #3 for the version
>> 2.45.0, as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1 if no issues are found.
>>
>> The complete staging area is available for your review, which includes:
>> * GitHub Release notes [1],
>> * the official Apache source release to be deployed to dist.apache.org
>> [2], which is signed with the key with fingerprint 921F35F5EC5F5DDE [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "v2.45.0-RC1" [5],
>> * website pull request listing the release [6], the blog post [6], and
>> publishing the API reference manual [7].
>> * Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle
>> JDK JDK_VERSION.
>> * Python artifacts are deployed along with the source release to the
>> dist.apache.org [2] and PyPI[8].
>> * Go artifacts and documentation are available at pkg.go.dev [9]
>> * Validation sheet with a tab for 2.45.0release to help with validation
>> [10].
>> * Docker images published to Docker Hub [11].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> For guidelines on how to try the release in your projects, check out our
>> blog post at /blog/validate-beam-release/.
>>
>> Thanks,
>> John Casey
>>
>> [1] https://github.com/apache/beam/milestone/8
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1293/
>> [5] https://github.com/apache/beam/tree/v2.45.0-RC1
>> [6] https://github.com/apache/beam/pull/25407
>> [7] https://github.com/apache/beam-site/pull/640
>> [8] https://pypi.org/project/apache-beam/2.45.0rc1/
>> [9]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam
>> [10]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842
>> [11] https://hub.docker.com/search?q=apache%2Fbeam=image
>>
>


Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-09 Thread Luke Cwik via dev
Our current container java 8 container is 262 MiBs and layers on top of
openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.

I would rather not get into issues with C library differences caused by the
alpine project so I would stick with the safer option and let users choose
alpine when building their custom container if they feel it provides a
large win for them. We can always swap to alpine in the future as well if
the C library differences become a non-issue.

So swapping to eclipse-temurin will save us a bunch on the container size
which should help with container transfer and hopefully for startup times
as well.

On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud  wrote:

> This sounds reasonable to me as well.
>
> I've made swaps like this in the past, the base image of each is probably
> a bigger factor than the JDK. The openjdk images were based on Debian 11.
> The default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
> option. Ubuntu is a Debian derivative but the versions and package names
> aren't exact matches and Ubuntu tends to update a little faster. For most
> users I don't think this will matter but users building custom containers
> may need to make minor changes. The alpine option will be much smaller
> (which could be a significant improvement) but would be a more significant
> change to the environment.
>
> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Seams reasonable to me.
>>
>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
>> wrote:
>> >
>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
>> stopped being built and supported since July 2022. I have filed [2] to
>> track the resolution of this issue.
>> >
>> > Based upon [1], almost everyone is swapping to the eclipse-temurin
>> container[3] as their base based upon the linked issues from the
>> deprecation notice[1]. The eclipse-temurin container is released under
>> these licenses:
>> > Apache License, Version 2.0
>> > Eclipse Distribution License 1.0 (BSD)
>> > Eclipse Public License 2.0
>> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
>> Assembly Exception
>> > 一 (Secondary) GNU General Public License, version 2 with the GNU
>> Classpath Exception
>> >
>> > I propose that we swap all our containers to the eclipse-temurin
>> containers[3].
>> >
>> > Open to other ideas and also would be great to hear about your
>> experience in any other projects that you have had to make a similar
>> decision.
>> >
>> > 1: https://github.com/docker-library/openjdk/issues/505
>> > 2: https://github.com/apache/beam/issues/25371
>> > 3: https://hub.docker.com/_/eclipse-temurin
>>
>


OpenJDK8 / OpenJDK11 container deprecation

2023-02-07 Thread Luke Cwik via dev
As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
stopped being built and supported since July 2022. I have filed [2] to
track the resolution of this issue.

Based upon [1], almost everyone is swapping to the eclipse-temurin
container[3] as their base based upon the linked issues from the
deprecation notice[1]. The eclipse-temurin container is released under
these licenses:
Apache License, Version 2.0
Eclipse Distribution License 1.0 (BSD)
Eclipse Public License 2.0
一 (Secondary) GNU General Public License, version 2 with OpenJDK Assembly
Exception
一 (Secondary) GNU General Public License, version 2 with the GNU Classpath
Exception

I propose that we swap all our containers to the eclipse-temurin
containers[3].

Open to other ideas and also would be great to hear about your experience
in any other projects that you have had to make a similar decision.

1: https://github.com/docker-library/openjdk/issues/505
2: https://github.com/apache/beam/issues/25371
3: https://hub.docker.com/_/eclipse-temurin


Re: Portable v.s. non-portable PTransform names

2023-01-31 Thread Luke Cwik via dev
The PCollection value comes from the key on the pipeline proto[1]. That key
is populated during pipeline construction time[2] and is based upon the
unique name of the PTransform + the name of the output being used (aka tag
with .output being a default).

It looks like the counter PTRANFORM is coming from the metric step name[3].

I would take a look at the pipeline proto[4] that is generated during
pipeline construction and the process bundle descriptors[5] during pipeline
execution to see where something is being changed if at all.

They should be able to have the same style in generated names but tracking
down to where they are being changed is a good first step.

1:
https://github.com/apache/beam/blob/957301519bb76a9647d026885fced1a775a7c9ff/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L68
2:
https://github.com/apache/beam/blob/957301519bb76a9647d026885fced1a775a7c9ff/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java#L33
3:
https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java#L247
4:
https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L91
5:
https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L189

On Wed, Jan 11, 2023 at 3:29 PM Katie Liu  wrote:

> Attaching the monitoring_infos received, if helpful.
>
> I observed that the PCOLLECTION name format is the same in non-portable mode, 
> but the PTRANSFORM name has dashes instead.
>
> ```
>
> monitoring_infos {
>   urn: "beam:metric:element_count:v1"
>   type: "beam:metrics:sum_int64:v1"
>   payload: "\000"
>   labels {
> key: "PCOLLECTION"
> value: "Kati-Step-2/ParMultiDo(Anonymous).output"
>   }
> }
> monitoring_infos {
>   urn: "beam:metric:user:sum_int64:v1"
>   type: "beam:metrics:sum_int64:v1"
>   payload: "\n"
>   labels {
> key: "NAME"
> value: "count101"
>   }
>   labels {
> key: "NAMESPACE"
> value: "org.apache.beam.runners.samza.portable.SamzaPortableTest"
>   }
>   labels {
> key: "PTRANSFORM"
> value: "Kati-Step-2-ParMultiDo-Anonymous-"
>   }
> }
>
> ```
>
>
> On Wed, Jan 11, 2023 at 2:38 PM Katie Liu  wrote:
>
>> Hi beam-dev,
>>
>> I have a question regarding the PTransform name formatting.
>> For the same user defined function, the naming is different using samza
>> portable is "Kati-Step-2-ParMultiDo-Anonymous-", while in normal mode it
>> is "Kati-Step-2/ParMultiDo(Anonymous)".
>>
>> Does this problem only exist in Samza? And are there pointers to where
>> the PTransform name is generated?
>>
>> Thanks,
>> Katie
>>
>


Re: Thoughts on extensions/datasketches vs adding to the existing sketching library?

2023-01-18 Thread Luke Cwik via dev
I would suggest adding it to the existing package(s) (either
sdks/java/extensions or sdks/java/zetasketch or both depending on if you're
replacing existing sketches or adding new ones) since we shouldn't expose
sketching libraries API surface. We should make the API take all the
relevant parameters since this allows us to move between variants and
choose the best sketching library.

On Wed, Jan 18, 2023 at 11:24 AM Reuven Lax via dev 
wrote:

> I believe that when zetasketch was added, it was also noticeably more
> efficient than other sketch implementations. However this was a number of
> years ago, and I don't know whether it still has an advantage or not.
>
> On Wed, Jan 18, 2023 at 10:41 AM Byron Ellis via dev 
> wrote:
>
>> Hi everyone,
>>
>> I was looking at adding at least a couple of the sketches from the Apache
>> Datasketches library to the Beam Java SDK and I was wondering if folks had
>> a preference for adding to the existing "sketching" extension vs splitting
>> it out into its own extension?
>>
>> The reason I ask is that there's some overlap (which already exists in
>> zetasketch) between the sketches available in Datasketches vs Beam today,
>> particularly HyperLogLog which would have 3 implementations if we were to
>> add all of them.
>>
>> I don't really have a strong opinion, though personally I'd probably lean
>> towards a single sketching extension (zetasketch being something of a
>> special case as it exists for format compatibility as far as I can tell).
>> But I could see how that could be confusing if you had the Apache
>> Datasketch implementation and the existing implementation derived from the
>> clearspring implementations.
>>
>> Any thoughts?
>>
>> Best,
>> B
>>
>


Re: BigTable reader for Python?

2023-01-06 Thread Luke Cwik via dev
The proto (java) -> bytes -> proto (python) sounds good.

Have you tried moving your DoFn outside of your main module into a new
module as per [1]. Other suggestions are to do the import in the function.
Can you do the import once in the setup()[2] function? Have you considered
using the cloud profiler[3] to see what is actually slow?

1:
https://stackoverflow.com/questions/69436706/nameerror-name-beam-is-not-defined-in-lambda
2:
https://github.com/apache/beam/blob/f9d5de34ae1dad251f5580073c0245a206224a69/sdks/python/apache_beam/transforms/core.py#L670
3: https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python


On Fri, Jan 6, 2023 at 11:19 AM Lina Mårtensson  wrote:

> I am *so close* it seems. ;)
>
> I followed Luke's advice and am reading the proto
> com.google.bigtable.v2.Row, then use a transform to convert that to bytes
> in order to be able to send it across to Python. (I assume that's what I
> should be doing with the proto?)
> Once on the Python side, when running on Dataflow, I'm running into the
> dreaded NameError.
> save_main_session is True.
>
> Either
> from google.cloud.bigtable_v2.types import Row
> ...
> class ParsePB(beam.DoFn):
> def process(self, pb_bytes):
> row = Row()
> row.ParseFromString(pb_bytes)
>
> or
>
> from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2
> ...
> class ParsePB(beam.DoFn):
> def process(self, pb_bytes):
> row = Row()
> row.ParseFromString(pb_bytes)
>
> works in the DirectRunner (if I skip the Java connection and fake input
> data), but not on Dataflow.
> It works if I put the import in the process() function, although then
> running the code is super slow. (I'm not sure why, but running an import on
> every entry definitely sounds like it could cause that!)
>
> (I still have issues with the DirectRunner, as per my previous email.)
>
> Is there a good way to get around this?
>
> Thanks!
> -Lina
>
> On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson  wrote:
>
>> Great, thanks! That was a huge improvement.
>>
>>
>> On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik  wrote:
>>
>>> By default Beam Java only uploads artifacts that have changed but it
>>> looks like this is not the case for Beam Python and you need to explicitly
>>> opt in with the --enable_artifact_caching flag[1].
>>>
>>> It looks like this feature was added 1 year ago[2], should we make this
>>> on by default?
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794
>>> 2: https://github.com/apache/beam/pull/16229
>>>
>>>
>>>
>>> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson 
>>> wrote:
>>>
>>>> Thanks! I have now successfully written a beautiful string of protobuf
>>>> bytes into a file via Python. 
>>>>
>>>> Two issues though:
>>>> 1. Robert said the Python direct runner would just work with this - but
>>>> it's not working. After about half an hour of these messages repeated over
>>>> and over again I interrupted the job:
>>>>
>>>> E0105 07:25:48.170601677   58210 fork_posix.cc:76]   Other
>>>> threads are currently calling into gRPC, skipping fork() handlers
>>>>
>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05
>>>> 06:57:10 Failed to obtain provisioning information: failed to dial server
>>>> at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n'
>>>> 2. I (unsurprisingly) get back to the issue I had when I tested out the
>>>> Spanner x-lang transform on Dataflow - the overhead for starting a job is
>>>> unbearably slow, the time mainly spent in transferring the expansion
>>>> service jar (115 MB) + my jar (105 MB) with my new code and its
>>>> dependencies:
>>>>
>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS
>>>> upload to
>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar...
>>>>
>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
>>>> upload to
>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
>>&

Re: BigTable reader for Python?

2023-01-05 Thread Luke Cwik via dev
By default Beam Java only uploads artifacts that have changed but it looks
like this is not the case for Beam Python and you need to explicitly opt in
with the --enable_artifact_caching flag[1].

It looks like this feature was added 1 year ago[2], should we make this on
by default?

1:
https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794
2: https://github.com/apache/beam/pull/16229



On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson  wrote:

> Thanks! I have now successfully written a beautiful string of protobuf
> bytes into a file via Python. 
>
> Two issues though:
> 1. Robert said the Python direct runner would just work with this - but
> it's not working. After about half an hour of these messages repeated over
> and over again I interrupted the job:
>
> E0105 07:25:48.170601677   58210 fork_posix.cc:76]   Other
> threads are currently calling into gRPC, skipping fork() handlers
>
> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05
> 06:57:10 Failed to obtain provisioning information: failed to dial server
> at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n'
> 2. I (unsurprisingly) get back to the issue I had when I tested out the
> Spanner x-lang transform on Dataflow - the overhead for starting a job is
> unbearably slow, the time mainly spent in transferring the expansion
> service jar (115 MB) + my jar (105 MB) with my new code and its
> dependencies:
>
> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload
> to
> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar...
>
> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload
> to
> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
> in 321 seconds.
>
> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload
> to
> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar...
>
> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload
> to
> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar
> in 295 seconds.
> I have a total of 13 minutes until any workers have started on Dataflow,
> then another 4.5 minutes once the job actually does anything (which
> eventually is to read a whopping 3 cells from Bigtable ;).
>
> How could this be improved?
> For one, it seems to me like the upload of
> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from my
> computer shouldn't be necessary - shouldn't Dataflow have that
> already/could it be fetched by Dataflow rather than having to upload it
> over slow internet?
> And what about my own jar - it's not bound to change very often, so would
> it be possible to upload somewhere and then fetch it from there?
>
> Thanks!
> -Lina
>
> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik  wrote:
>
>> I would suggest using BigtableIO which also returns a
>> protobuf com.google.bigtable.v2.Row. This should allow you to replicate
>> what SpannerIO is doing.
>>
>> Alternatively you could provide a way to convert the HBase result into a
>> Beam row by specifying a converter and a schema for it and then you could
>> use the already well known Beam Schema type:
>>
>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068
>>
>> Otherwise you'll have to register the HBase result coder with a well
>> known name so that the runner API coder URN is something that you know and
>> then on the Python side you would need a coder for that URN as well allow
>> you to understand the bytes being sent across from the Java portion of the
>> pipeline.
>>
>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson 
>> wrote:
>>
>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' which
>>> I learned
>>> <https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips>
>>> is because the transform is trying to return something that there isn't a 
>>> standard
>>> Beam coder for
>>> <https://github.com/apache/beam/blob/05428866cdbf1ea8e4c1789dd40327673f

Re: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?

2023-01-03 Thread Luke Cwik via dev
It looks like there is an existing issue[1]. I updated our correspondence
there and we should continue our communication there.

1: https://github.com/apache/beam/issues/24801,

On Tue, Jan 3, 2023 at 1:22 PM Reuven Lax  wrote:

> Ah, that is fair. However right now that doesn't happen either.
>
> On Tue, Jan 3, 2023 at 12:59 PM Luke Cwik  wrote:
>
>> I think in general ReadableState.read() should not be @Nullable but we
>> should allow for the overrides like ValueState to specify that T can
>> be @Nullable while others like ListState we should have List<@Nullable T>.
>>
>> On Tue, Jan 3, 2023 at 12:37 PM Reuven Lax via dev 
>> wrote:
>>
>>> It should be @Nullable - I'm not sure why that was removed.
>>>
>>> On Tue, Jan 3, 2023 at 12:18 PM Ahmet Altay via dev 
>>> wrote:
>>>
>>>> Forwarding, because this message got lost in the list moderation.
>>>>
>>>> -- Forwarded message --
>>>> From: Jeeno Lentin 
>>>> To: dev@beam.apache.org
>>>> Cc:
>>>> Bcc:
>>>> Date: Fri, 23 Dec 2022 00:36:55 -0500
>>>> Subject: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?
>>>> Hi,
>>>>
>>>> We use the Beam Java SDK and are trying to upgrade version of Beam from
>>>> version 2.31.0 to 2.43.0
>>>>
>>>> While upgrading, we noticed that @Nullable annotation has been removed
>>>> from org.apache.beam.sdk.state.ReadableState.read()
>>>>
>>>> I traced it back to this PR: https://github.com/apache/beam/pull/16721
>>>>
>>>> We have the following concerns
>>>> - If ReadableState.read() is really not nullable, shouldn’t there be a
>>>> way to specify a default value when creating a state? Such a feature
>>>> doesn’t seem to exist.
>>>> - And what would it return initially when nothing is written to the
>>>> state yet initially?
>>>>
>>>> Thank you,
>>>> Jeeno
>>>>
>>>


Re: BigTable reader for Python?

2023-01-03 Thread Luke Cwik via dev
lugins = [":auto_service_processor"],
>>
>> srcs = ["src/main/java/energy/camus/beam/BigtableRegistrar.java"],
>>
>> deps = [
>>
>> "@maven//:com_google_auto_service_auto_service",
>>
>> "@maven//:com_google_auto_service_auto_service_annotations",
>>
>>
>> "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam",
>>
>>
>> "@maven//:org_apache_beam_beam_sdks_java_core",
>>
>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>
>> "@maven//:org_apache_hbase_hbase_shaded_client",
>>
>> ],
>>
>> )
>>
>>
>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik  wrote:
>>
>>> AutoService relies on Java's compiler annotation processor.
>>> https://github.com/google/auto/tree/main/service#getting-started shows
>>> that you need to configure Java's compiler to use the annotation processors
>>> within AutoService.
>>>
>>> I saw this public gist that seemed to enable using the AutoService
>>> annotation processor with Bazel
>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00
>>>
>>>
>>>
>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> That's good news about the direct runner, thanks!
>>>>
>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev
>>>>>  wrote:
>>>>> >
>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson 
>>>>> wrote:
>>>>> >>
>>>>> >> Thanks for the detailed answers!
>>>>> >>
>>>>> >> I totally get the points about development & maintenance cost, and,
>>>>> >> from a user perspective, about getting the performance right.
>>>>> >>
>>>>> >> I decided to try out the Spanner connector to get a sense of how
>>>>> well
>>>>> >> the x-language approach works in our world, since that's an existing
>>>>> >> x-language connector.
>>>>> >> Overall, it works and with minimal intervention as you say - it is
>>>>> >> very slow, though.
>>>>> >> I'm a little confused about "portable runners" - if I understand
>>>>> this
>>>>> >> correctly, this means we couldn't run with the DirectRunner anymore
>>>>> if
>>>>> >> using an x-language connector? (At least it didn't work when I tried
>>>>> >> it.)
>>>>> >
>>>>> >
>>>>> > You'll have to use the portable DirectRunner -
>>>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>>>>> >
>>>>> > Job service for this can be started using following command:
>>>>> > python apache_beam/runners/portability/local_job_service_main.py -p
>>>>> 
>>>>>
>>>>> Note that the Python direct runner is already a portable runner, so
>>>>> you shouldn't have to do anything special (like start up a separate
>>>>> job service and pass extra options) to run locally. Just use the
>>>>> cross-language transforms as you would any normal Python transform.
>>>>>
>>>>> The goal is to make this as smooth and transparent as possible; please
>>>>> keep coming back to us if you find rough edges.
>>>>>
>>>>


Re: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?

2023-01-03 Thread Luke Cwik via dev
I think in general ReadableState.read() should not be @Nullable but we
should allow for the overrides like ValueState to specify that T can
be @Nullable while others like ListState we should have List<@Nullable T>.

On Tue, Jan 3, 2023 at 12:37 PM Reuven Lax via dev 
wrote:

> It should be @Nullable - I'm not sure why that was removed.
>
> On Tue, Jan 3, 2023 at 12:18 PM Ahmet Altay via dev 
> wrote:
>
>> Forwarding, because this message got lost in the list moderation.
>>
>> -- Forwarded message --
>> From: Jeeno Lentin 
>> To: dev@beam.apache.org
>> Cc:
>> Bcc:
>> Date: Fri, 23 Dec 2022 00:36:55 -0500
>> Subject: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?
>> Hi,
>>
>> We use the Beam Java SDK and are trying to upgrade version of Beam from
>> version 2.31.0 to 2.43.0
>>
>> While upgrading, we noticed that @Nullable annotation has been removed
>> from org.apache.beam.sdk.state.ReadableState.read()
>>
>> I traced it back to this PR: https://github.com/apache/beam/pull/16721
>>
>> We have the following concerns
>> - If ReadableState.read() is really not nullable, shouldn’t there be a
>> way to specify a default value when creating a state? Such a feature
>> doesn’t seem to exist.
>> - And what would it return initially when nothing is written to the state
>> yet initially?
>>
>> Thank you,
>> Jeeno
>>
>


Re: BigTable reader for Python?

2022-12-29 Thread Luke Cwik via dev
AutoService relies on Java's compiler annotation processor.
https://github.com/google/auto/tree/main/service#getting-started shows that
you need to configure Java's compiler to use the annotation processors
within AutoService.

I saw this public gist that seemed to enable using the AutoService
annotation processor with Bazel
https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00



On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev 
wrote:

> That's good news about the direct runner, thanks!
>
> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw 
> wrote:
>
>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev
>>  wrote:
>> >
>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson 
>> wrote:
>> >>
>> >> Thanks for the detailed answers!
>> >>
>> >> I totally get the points about development & maintenance cost, and,
>> >> from a user perspective, about getting the performance right.
>> >>
>> >> I decided to try out the Spanner connector to get a sense of how well
>> >> the x-language approach works in our world, since that's an existing
>> >> x-language connector.
>> >> Overall, it works and with minimal intervention as you say - it is
>> >> very slow, though.
>> >> I'm a little confused about "portable runners" - if I understand this
>> >> correctly, this means we couldn't run with the DirectRunner anymore if
>> >> using an x-language connector? (At least it didn't work when I tried
>> >> it.)
>> >
>> >
>> > You'll have to use the portable DirectRunner -
>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>> >
>> > Job service for this can be started using following command:
>> > python apache_beam/runners/portability/local_job_service_main.py -p
>> 
>>
>> Note that the Python direct runner is already a portable runner, so
>> you shouldn't have to do anything special (like start up a separate
>> job service and pass extra options) to run locally. Just use the
>> cross-language transforms as you would any normal Python transform.
>>
>> The goal is to make this as smooth and transparent as possible; please
>> keep coming back to us if you find rough edges.
>>
>


Re: BigTable reader for Python?

2022-12-29 Thread Luke Cwik via dev
I would have expected
a META-INF/services/org.apache.beam.sdk.expansion.ExternalTransformRegistrar
file in the jar containing the fully qualified class name
of BigtableRegistrar in it. See
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.43.0/beam-sdks-java-io-kafka-2.43.0.jar
for an example of how Java's ServiceLoader expects the jar to be laid out.

It looks like the bazel build is not generating the META-INF/ file that the
`@AutoService` annotation is responsible for or the way that the bazel
build is taking the output files from the build process and generating the
jar is forgetting to take that file as well.

On Wed, Dec 28, 2022 at 11:30 PM Lina Mårtensson via dev <
dev@beam.apache.org> wrote:

> I kept working with an ExternalTransformRegistrar solution (although if
> there's an easier way, I'm all ears), and I have Java code that builds, and
> a Python connector that tries to use it.
>
> My current issue is that the expansion service that's started up doesn't
> find my transform using the URN provided:
> RuntimeError: java.lang.UnsupportedOperationException: Unknown urn:
> beam:external:CAMUS:bigtable_read:v1
>
> And I can see that my transform wasn't registered:
>
> INFO:apache_beam.utils.subprocess_server:b'INFO: Registering external
> transforms: [beam:transform:org.apache.beam:pubsub_read:v1,
> beam:transform:org.apache.beam:pubsub_write:v1,
> beam:transform:org.apache.beam:pubsublite_write:v1,
> beam:transform:org.apache.beam:pubsublite_read:v1,
> beam:transform:org.apache.beam:spanner_insert:v1,
> beam:transform:org.apache.beam:spanner_update:v1,
> beam:transform:org.apache.beam:spanner_replace:v1,
> beam:transform:org.apache.beam:spanner_insert_or_update:v1,
> beam:transform:org.apache.beam:spanner_delete:v1,
> beam:transform:org.apache.beam:spanner_read:v1,
> beam:transform:org.apache.beam:schemaio_bigquery_read:v1,
> beam:transform:org.apache.beam:schemaio_bigquery_write:v1,
> beam:transform:org.apache.beam:schemaio_datastoreV1_read:v1,
> beam:transform:org.apache.beam:schemaio_datastoreV1_write:v1,
> beam:transform:org.apache.beam:schemaio_pubsub_read:v1,
> beam:transform:org.apache.beam:schemaio_pubsub_write:v1,
> beam:transform:org.apache.beam:schemaio_jdbc_read:v1,
> beam:transform:org.apache.beam:schemaio_jdbc_write:v1,
> beam:transform:org.apache.beam:schemaio_avro_read:v1,
> beam:transform:org.apache.beam:schemaio_avro_write:v1,
> beam:external:java:generate_sequence:v1]'
>
> I'm creating the expansion service in code like this:
>
> expansion_service = BeamJarExpansionService(
>
>
> 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
>
> extra_args=["{{PORT}}",
> '--javaClassLookupAllowlistFile=*'],
>
> classpath=[
> "/home/builder/xlang/bando/bazel-bin/bigtable/libjava_hbase.jar"])
>
> where libjava_hbase.jar was built by Bazel and contains my code:
>
> $ jar tf libjava_hbase.jar
>
> META-INF/
>
> META-INF/MANIFEST.MF
>
> energy/
>
> energy/camus/
>
> energy/camus/beam/
>
> energy/camus/beam/BigtableRegistrar$BigtableReadBuilder$Configuration.class
>
> energy/camus/beam/BigtableRegistrar$BigtableReadBuilder.class
>
> energy/camus/beam/BigtableRegistrar$CrossLanguageConfiguration.class
>
> energy/camus/beam/BigtableRegistrar.class
>
> The relevant part of my code that does the registration looks like this:
>
> @AutoService(ExternalTransformRegistrar.class)
>
> public class BigtableRegistrar implements ExternalTransformRegistrar {
>
>
> static final String READ_URN = "beam:external:CAMUS:bigtable_read:v1";
>
>
> @Override
>
> public Map>
> knownBuilderInstances() {
>
> return ImmutableMap.of(READ_URN, new BigtableReadBuilder());
>
> }
>
> What am I missing that prevents my transform to be registered?
>
> Thanks,
> -Lina
>
> On Tue, Dec 27, 2022 at 5:11 PM Lina Mårtensson  wrote:
>
>> I finally was able to get back to this and try to make an x-language
>> transform for Bigtable to be used in Python, but I could use some help.
>>
>> I started out with the Bigtable
>> 
>> library, and it seemed like I should be able to go with option 1 here
>> ,
>> i.e. not write any Java code.
>>
>> As a non-Java user, it still wasn't obvious how to get this working, but
>> I eventually got it:
>>
>> java_transform = JavaExternalTransform(
>>
>> 'org.apache.beam.sdk.io.gcp.bigtable.BigtableIO',
>>
>>
>> BeamJarExpansionService(
>>
>>
>>
>> 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
>>
>>
>> extra_args=["{{PORT}}",
>> '--javaClassLookupAllowlistFile=*'])
>>
>> ).read().withProjectId(projectId="myProjectId")
>>
>>
>>
>>
>>
>> data = p | 'Read from 

Re: @RequiresStableInput and Pipeline fusion

2022-12-13 Thread Luke Cwik via dev
This is definitely not working for portable pipelines since the
GreedyPipelineFuser doesn't create a fusion boundary which as you pointed
out causes a single stage that has a non-deterministic function followed by
one that requires stable input. It seems as though we should have runners
check the requirements on the Pipeline[1] to ensure that they can
faithfully process such a pipeline and reject anything they don't support
early on.

Making the GreedyPipelineFuser insert that fusion break is likely the way
to go. Runners should be able to look at the ParDoPayload
requires_stable_input field for the ExecutableStage to see if any special
handling is necessary on their end before they pass data to that stage.

[1]:
https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111


On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský  wrote:

> Hi,
>
> I have a question about @RequiresStableInput functionality. I'm trying to
> make it work for portable Flink runner [1], [2]. We have an integration
> test (which should probably be turned into Validates runner test, but that
> is a different story) [3]. The test creates random key for input element,
> processes it once, fails the pipeline and then reprocesses it. This works
> well provided there is a checkpoint (shuffle in case of dataflow) exactly
> between assigning random key (via PairWithRandomKeyFn) and processing it
> with (via MakeSideEffectAndThenFailFn), this works well.
>
> The problem is that GreedyPipelineFuser fuses the transform 
> PairWithRandomKeyFn
> and MakeSideEffectAndThenFailFn into single ExecutableStage. This is then
> executed with the @RequiresStableInput requirement, but this obviously
> assigns a different key to the reprocessed element(s). This looks like we
> need to fix that in the PipelineFuser, is this right? Does this mean the
> @RequiresStableInput functionality is actually broken for all runners that
> use the default fusion?
>
> Another possibility is that we need to fix test by adding an explicit
> reshuffle (verified, this works), but I think that the test is actually
> correct, users would probably not expect transforms to be fused when
> crossing the @RequiresStableInput boundary.
>
> Thoughts?
>
>  Jan
>
>
> [1] https://github.com/apache/beam/issues/20812
> [2] https://github.com/apache/beam/pull/22889
> [3]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
>


Re: Gradle Task Configuration Avoidance

2022-12-09 Thread Luke Cwik via dev
I'm talking about the build scans that take you to gradle.com

For example in one of our Jenkins job
<https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Java/9794/>
there
is this build scan <https://gradle.com/s/m46dhiiwcieiu>. You can also get
these for builds that you do locally.

On Fri, Dec 9, 2022 at 3:05 PM Damon Douglas 
wrote:

> Thank you, Luke and Daniel,
>
> I wasn't aware that these tools were possible.  Luke, the build report to
> which you refer is a local HTML file produced after executing a task?  I
> see it when I fail tests or various checks.
>
> Best,
>
> Damon
>
> On Thu, Dec 8, 2022 at 8:59 AM Daniel Collins 
> wrote:
>
>> We could probably add a lint that rejects the spelling `task("` pretty
>> easily that would catch most of these.
>>
>> On Thu, Dec 8, 2022 at 11:34 AM Luke Cwik via dev 
>> wrote:
>>
>>> I have found the Gradle build reports very useful to enumerate
>>> deprecations and an easier thing to look at over the command line output.
>>>
>>> On Thu, Dec 8, 2022 at 8:26 AM Damon Douglas via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Thank you, Kerry, for your kind and encouraging words!
>>>>
>>>> Kenn, I wondered as well whether there exist proactive options.  I know
>>>> that gradle will warn of soon-to-be deprecated syntax in the build.gradle
>>>> files when executing gradle tasks on the command-line.  Perhaps we can
>>>> start there.  Not to sound cliche, but with any process improvement,
>>>> awareness is the first step.
>>>>
>>>> On Mon, Dec 5, 2022 at 3:54 PM Kenneth Knowles  wrote:
>>>>
>>>>> Nice!
>>>>>
>>>>> I believe at some point in the past we made a pass to try to convert
>>>>> our stuff to this model. I wonder if we can prevent it proactively 
>>>>> somehow,
>>>>> like disabling the legacy way of creating tasks or something.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Dec 5, 2022 at 6:25 AM Kerry Donny-Clark via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Thanks Damon! I really appreciate how clear your emails are here.
>>>>>> Instead of my usual feeling of "I don't quite understand, and don't have
>>>>>> time to get context" I can read all the context in the mail.
>>>>>> This error message had confused me, so I really appreciate the
>>>>>> cleanup and explanation.
>>>>>>
>>>>>> On Fri, Dec 2, 2022, 7:28 PM Damon Douglas via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hello Everyone,
>>>>>>>
>>>>>>> *If you are new to Beam and coming from non-Java language
>>>>>>> conventions, it is likely you are new to gradle.  At the end of this 
>>>>>>> email
>>>>>>> is a list of definitions and references to help understand this email.*
>>>>>>>
>>>>>>> *Short Version (For those who know gradle)*:
>>>>>>> A pull request [1] may fix the continual error message "Error:
>>>>>>> Backend initialization required, please run "terraform init"".  The PR
>>>>>>> applies Task Configuration Avoidance [2] by applying changes to a few 
>>>>>>> tasks
>>>>>>> from tasks(String) to tasks.register(String).
>>>>>>>
>>>>>>> *Long Version (For those who are not as familiar with gradle)*:
>>>>>>>
>>>>>>> I write this not as an expert but as someone still learning.  Gradle
>>>>>>> [3] is the software we use in the Beam repository to automate many 
>>>>>>> needed
>>>>>>> tasks associated with building and testing code.  It is typically used 
>>>>>>> in
>>>>>>> Java projects but can be extended for other purposes.  We store code
>>>>>>> related to our Beam Playground [4] that also uses gradle though it is 
>>>>>>> not
>>>>>>> mainly a Java project.  The unit of work for Gradle is what is called a
>>>>>>> task.  To run a task you open a terminal and type "./gradlew
>>>>>>> nameOfMyTask".  There are two main ways to create a custom task in our
>>>>&

Re: Gradle Task Configuration Avoidance

2022-12-08 Thread Luke Cwik via dev
I have found the Gradle build reports very useful to enumerate deprecations
and an easier thing to look at over the command line output.

On Thu, Dec 8, 2022 at 8:26 AM Damon Douglas via dev 
wrote:

> Thank you, Kerry, for your kind and encouraging words!
>
> Kenn, I wondered as well whether there exist proactive options.  I know
> that gradle will warn of soon-to-be deprecated syntax in the build.gradle
> files when executing gradle tasks on the command-line.  Perhaps we can
> start there.  Not to sound cliche, but with any process improvement,
> awareness is the first step.
>
> On Mon, Dec 5, 2022 at 3:54 PM Kenneth Knowles  wrote:
>
>> Nice!
>>
>> I believe at some point in the past we made a pass to try to convert our
>> stuff to this model. I wonder if we can prevent it proactively somehow,
>> like disabling the legacy way of creating tasks or something.
>>
>> Kenn
>>
>> On Mon, Dec 5, 2022 at 6:25 AM Kerry Donny-Clark via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Thanks Damon! I really appreciate how clear your emails are here.
>>> Instead of my usual feeling of "I don't quite understand, and don't have
>>> time to get context" I can read all the context in the mail.
>>> This error message had confused me, so I really appreciate the cleanup
>>> and explanation.
>>>
>>> On Fri, Dec 2, 2022, 7:28 PM Damon Douglas via dev 
>>> wrote:
>>>
 Hello Everyone,

 *If you are new to Beam and coming from non-Java language conventions,
 it is likely you are new to gradle.  At the end of this email is a list of
 definitions and references to help understand this email.*

 *Short Version (For those who know gradle)*:
 A pull request [1] may fix the continual error message "Error: Backend
 initialization required, please run "terraform init"".  The PR applies Task
 Configuration Avoidance [2] by applying changes to a few tasks from
 tasks(String) to tasks.register(String).

 *Long Version (For those who are not as familiar with gradle)*:

 I write this not as an expert but as someone still learning.  Gradle
 [3] is the software we use in the Beam repository to automate many needed
 tasks associated with building and testing code.  It is typically used in
 Java projects but can be extended for other purposes.  We store code
 related to our Beam Playground [4] that also uses gradle though it is not
 mainly a Java project.  The unit of work for Gradle is what is called a
 task.  To run a task you open a terminal and type "./gradlew
 nameOfMyTask".  There are two main ways to create a custom task in our
 build.gradle files.  One is writing task("doSomething") and the other is
 tasks.register("doSomethingElse").  According to [2], the recommendation is
 to use the tasks.register("doSomething").  This avoids executing other work
 (configuration but don't worry about it for now) until one runs the
 doSomething task or another task we are running depends on it.

 So why were we seeing this "Error: Backend initialization required"
 message all the time?  The reason is that tasks were configured as
 task("doSomething").  All I had to do was change this to
 tasks.register("doSomething") and it removed the message.

 *Definitions/References*

 1. https://github.com/apache/beam/pull/24509
 2.
 https://docs.gradle.org/current/userguide/task_configuration_avoidance.html
 3. https://docs.gradle.org/current/userguide/what_is_gradle.html
 4. https://play.beam.apache.org/

 *Suggested Learning Path To Understand This Email*
 1.
 https://docs.gradle.org/current/samples/sample_building_java_libraries.html
 2. https://docs.gradle.org/current/userguide/build_lifecycle.html
 3. https://docs.gradle.org/current/userguide/tutorial_using_tasks.html
 4.
 https://docs.gradle.org/current/userguide/task_configuration_avoidance.html

 Best,

 Damon




Re: [DISCUSSION][JAVA] Current state of Java 17 support

2022-12-01 Thread Luke Cwik via dev
We do support JDK8, JDK11 and JDK17. Our story around newer features within
JDKs 9+ like modules is mostly non-existent though.

We rarely run into JDK specific issues, the latest were the TLS1 and TLS1.1
deprecation in newer patch versions of the JDK and also the docker cpu
share issues with different JDK versions. Even though it would be nice to
cover more, we currently have too many flaky tests and an already busy
Jenkins cluster. I believe we would get a lot more value out of deflaking
our existing tests and re-enabling disabled tests.

I got to give credit to the JDK folks for how well they have maintained
compatibility over the years.

On Thu, Dec 1, 2022 at 9:05 AM Sachin Agarwal via dev 
wrote:

> This is a good heads up, thank you Cristian.
>
> On Thu, Dec 1, 2022 at 8:13 AM Cristian Constantinescu 
> wrote:
>
>> Hi,
>>
>> I came across some Kafka info and would like to share for those
>> unaware. Kafka is planning to drop support for Java 8 in Kafka 4 (Java
>> 8 is deprecated in Kafka 3), see KIP-750 [1].
>>
>> I'm not sure when Kafka 4 is scheduled to be released (probably a few
>> years down the road), but when it happens, KafkaIO may not be able to
>> support it if we maintain Java 8 compatibility unless it remains on
>> Kafka 3.
>>
>> Anyways, if not already done, I think it's a good idea to start
>> putting up serious warning flags around Beam used with Java 8, even
>> for Google cloud customers ;)
>>
>> Cheers,
>> Cristian
>>
>> [1] https://issues.apache.org/jira/browse/KAFKA-12894
>>
>> On Wed, Nov 30, 2022 at 12:59 PM Kenneth Knowles  wrote:
>> >
>> > An important thing is to ensure that we do not accidentally depend on
>> something that would break Java 8 support.
>> >
>> > Currently our Java 11 and 17 tests build the code with Java 8 (just
>> like our released artifacts) and then compile and run the test code with
>> the newer JDK. This roughly matches the user scenario, I think. So it is a
>> little more complex than just having separate test runs for different JDK
>> versions. But it would be good to make this more symmetrical between JDK
>> versions to develop the mindset that JDK is always explicit.
>> >
>> > Kenn
>> >
>> > On Wed, Nov 30, 2022 at 9:48 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>> >>
>> >>
>> >> On 30 Nov 2022, at 03:56, Tomo Suzuki via dev 
>> wrote:
>> >>
>> >> > Do we still need to support Java 8 SDK?
>> >>
>> >> Yes, for Google Cloud customers who still use Java 8, I want Apache
>> Beam to support Java 8. Do you observe any special burden maintaining Java
>> 8?
>> >>
>> >>
>> >> I can only think of the additional resources costs if we will test all
>> supported JDKs, as Austin mentioned above. Imho, we should do that for all
>> JDK that are officially supported.
>> >> Another less-costly way is to run the Java tests for all JDKs only
>> during the release preparation stage.
>> >>
>> >> I agree that it would make sense to continue to support Java 8 until a
>> significant number of users are using it.
>> >>
>> >> —
>> >> Alexey
>> >>
>> >>
>> >>
>> >> Regards,
>> >> Tomo
>> >>
>> >> On Tue, Nov 29, 2022 at 21:48 Austin Bennett 
>> wrote:
>> >>>
>> >>> -1 for ongoing Java8 support [ or, said another way, +1 for dropping
>> support of Java8 ]
>> >>>
>> >>> +1 for having tests that run for ANY JDK that we say we support.  Is
>> there any reason the resources to support are too costly [ or outweigh the
>> benefits of additional confidence in ensuring we support what we say we do
>> ]?  I am not certain on whether this would only be critical for releases,
>> or should be done as part of regular CI.
>> >>>
>> >>> On Tue, Nov 29, 2022 at 8:51 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>> 
>>  Hello,
>> 
>>  I’m sorry if it’s already discussed somewhere but I find myself a
>> little bit lost in the subject.
>>  So, I’d like to clarify this - what is a current official state of
>> Java 17 support at Beam?
>> 
>>  I recall that a great job was done to make Beam compatible with Java
>> 17 [1] and Beam already provides “beam_java17_sdk” Docker image [2] but,
>> iiuc, Java 8 is still the default JVM to run all Java tests on Jenkins
>> ("Java PreCommit" in the first order) and there are only limited number of
>> tests that are running with JDK 11 and 17 on Jenkins by dedicated jobs.
>> 
>>  So, my question would sound like if Beam officially supports Java 17
>> (and 11), do we need to run all Beam Java SDK related tests (VR and IT test
>> including) against all supported Java SDKs?
>> 
>>  Do we still need to support Java 8 SDK?
>> 
>>  In the same time, as we are heading to move everything from Jenkins
>> to GitHub actions, what would be the default JDK there or we will run all
>> Java-related actions against all supported JDKs?
>> 
>>  —
>>  Alexey
>> 
>>  [1] https://issues.apache.org/jira/browse/BEAM-12240
>>  [2] https://hub.docker.com/r/apache/beam_java17_sdk
>> 

Re: [Proposal] Beam MultimapState API

2022-10-31 Thread Luke Cwik via dev
Thanks, I took a look and left some comments.

On Mon, Oct 31, 2022 at 12:47 PM Ahmet Altay  wrote:

> Thank you for the message Buqian. Adding @Reuven Lax  
> @Lukasz
> Cwik  explicitly (who are mentioned on the doc).
>
> On Mon, Oct 31, 2022 at 12:17 PM 郑卜千  wrote:
>
>> Gentle ping. Thanks!
>>
>> On Thu, Oct 27, 2022 at 2:55 PM 郑卜千  wrote:
>>
>>> Hi all,
>>>
>>> I've been working on adding MultimapState support to the Dataflow
>>> Runner, and the state interface is currently missing from the Beam State
>>> API.
>>>
>>> I have an one pager proposing its API interface in
>>> https://docs.google.com/document/d/1zm16QCxWEWNy4qW1KKoA3DmQBOLrTSPimXiqQkTxokc/edit#.
>>> Please share suggestions/comments!
>>>
>>> Thanks!
>>> Buqian Zheng
>>>
>>>


Re: [VOTE] Release 2.42.0, release candidate #1

2022-10-13 Thread Luke Cwik via dev
Thanks, I missed that when I was reviewing the issue.

On Tue, Oct 11, 2022 at 5:01 PM Robert Burke  wrote:

> That merge commit doesn't appear in the 2.42.0 release branch, so I've
> moved that issue to the 2.43.0 release milestone.
>
> On Tue, Oct 11, 2022, 4:07 PM Luke Cwik via dev 
> wrote:
>
>> I would like to point out that I found another regression due to the
>> bigdataoss library upgrade from 2.2.6 to 2.2.8 (
>> https://github.com/apache/beam/pull/23300), filed
>> https://github.com/apache/beam/issues/23588.
>>
>> On Mon, Oct 10, 2022 at 1:17 PM Robert Burke  wrote:
>>
>>> Due to a process error on my part, some test failures from the Release
>>> validation PR ended up getting overlooked. (Thank you Valentyn for
>>> noticing!)
>>>
>>> Therefore, despite sufficient +1s, I'm going to -1 this RC to ensure
>>> these issues are validated.
>>>
>>> In particular:
>>> All the main Python PostCommits were failing. Investigation is underway,
>>> and https://github.com/apache/beam/issues/23560 is being tracked.
>>> I need someone to look at the Flink Examples postcommit failures:
>>> https://github.com/apache/beam/issues/23561
>>> I also need someone to look at
>>> https://github.com/apache/beam/issues/23562 which appears to be a
>>> single test failure in the java core when run on Streaming Dataflow.
>>>
>>> Since these issues need addressing anyways, I accepted the
>>> SpannerIO cherrypick: https://github.com/apache/beam/pull/23530, and
>>> that will be included in the RC2.
>>>
>>> The current scope of the cherrypicks (real or potential) shouldn't
>>> invalidate any Java or Go verification that has been done, and the Python
>>> side could be as simple as quota issues on the live services and our GCP
>>> testing project.
>>>
>>> I ended up having severe scheduling conflicts during this release
>>> (mostly a conference talk to give last week), preventing having the
>>> bandwidth to focus on the release issues. That combined with getting sick
>>> at the cut stage, and last minute unavoidable week of work travel,
>>> exacerbated these delays.
>>>
>>> Thank you for your patience and understanding.
>>> Robert Burke
>>> 2.42.0 Release Manager
>>>
>>>
>>>
>>> On Tue, 4 Oct 2022 at 09:18, Robert Burke  wrote:
>>>
>>>> And the missing version substitutions are
>>>> Gradle 7.5.1
>>>> JDK Version: AdoptOpen JDK 1.8.0_292
>>>>
>>>> On Tue, Oct 4, 2022, 9:01 AM Robert Burke  wrote:
>>>>
>>>>> Agreed that the Python results appeared to be missing. The comment
>>>>> history indicates they were invoked however (and they appear in the Mass
>>>>> Comment script).
>>>>>
>>>>> Re-runs are failing for either unclear reasons, timeouts or other
>>>>> apparent infrastructure flakes.
>>>>>
>>>>> While we have 3 binding +1 votes for RC1, please hold while this is
>>>>> looked into, it may require an RC2 to resolve.
>>>>>
>>>>> On Mon, Oct 3, 2022, 8:15 PM Ahmet Altay via dev 
>>>>> wrote:
>>>>>
>>>>>> +1 (binding) - I validated python quick starts on direct runner.
>>>>>>
>>>>>> Thank you for working on the release!
>>>>>>
>>>>>> Ahmet
>>>>>>
>>>>>> On Mon, Oct 3, 2022 at 9:06 AM Valentyn Tymofieiev via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> I validated that Dataflow and Beam Python containers have
>>>>>>> dependencies that match Beam requirements.
>>>>>>>
>>>>>>> I came across https://github.com/apache/beam/pull/23200 - there are
>>>>>>> failed tests and I don't see test results for Python PostCommit suites. 
>>>>>>> Do
>>>>>>> you know what's the status of both?
>>>>>>>
>>>>>>> Minor nits: missing substitution in  * Java artifacts were built
>>>>>>> with Gradle GRADLE_VERSION and OpenJDK/Oracle JDK JDK_VERSION.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 3, 2022 at 7:21 AM Ritesh Ghorse via dev <
>>>>>>&g

Re: [VOTE] Release 2.42.0, release candidate #1

2022-10-11 Thread Luke Cwik via dev
I would like to point out that I found another regression due to the
bigdataoss library upgrade from 2.2.6 to 2.2.8 (
https://github.com/apache/beam/pull/23300), filed
https://github.com/apache/beam/issues/23588.

On Mon, Oct 10, 2022 at 1:17 PM Robert Burke  wrote:

> Due to a process error on my part, some test failures from the Release
> validation PR ended up getting overlooked. (Thank you Valentyn for
> noticing!)
>
> Therefore, despite sufficient +1s, I'm going to -1 this RC to ensure these
> issues are validated.
>
> In particular:
> All the main Python PostCommits were failing. Investigation is underway,
> and https://github.com/apache/beam/issues/23560 is being tracked.
> I need someone to look at the Flink Examples postcommit failures:
> https://github.com/apache/beam/issues/23561
> I also need someone to look at https://github.com/apache/beam/issues/23562
> which appears to be a single test failure in the java core when run on
> Streaming Dataflow.
>
> Since these issues need addressing anyways, I accepted the
> SpannerIO cherrypick: https://github.com/apache/beam/pull/23530, and that
> will be included in the RC2.
>
> The current scope of the cherrypicks (real or potential) shouldn't
> invalidate any Java or Go verification that has been done, and the Python
> side could be as simple as quota issues on the live services and our GCP
> testing project.
>
> I ended up having severe scheduling conflicts during this release (mostly
> a conference talk to give last week), preventing having the bandwidth to
> focus on the release issues. That combined with getting sick at the cut
> stage, and last minute unavoidable week of work travel, exacerbated these
> delays.
>
> Thank you for your patience and understanding.
> Robert Burke
> 2.42.0 Release Manager
>
>
>
> On Tue, 4 Oct 2022 at 09:18, Robert Burke  wrote:
>
>> And the missing version substitutions are
>> Gradle 7.5.1
>> JDK Version: AdoptOpen JDK 1.8.0_292
>>
>> On Tue, Oct 4, 2022, 9:01 AM Robert Burke  wrote:
>>
>>> Agreed that the Python results appeared to be missing. The comment
>>> history indicates they were invoked however (and they appear in the Mass
>>> Comment script).
>>>
>>> Re-runs are failing for either unclear reasons, timeouts or other
>>> apparent infrastructure flakes.
>>>
>>> While we have 3 binding +1 votes for RC1, please hold while this is
>>> looked into, it may require an RC2 to resolve.
>>>
>>> On Mon, Oct 3, 2022, 8:15 PM Ahmet Altay via dev 
>>> wrote:
>>>
 +1 (binding) - I validated python quick starts on direct runner.

 Thank you for working on the release!

 Ahmet

 On Mon, Oct 3, 2022 at 9:06 AM Valentyn Tymofieiev via dev <
 dev@beam.apache.org> wrote:

> I validated that Dataflow and Beam Python containers have dependencies
> that match Beam requirements.
>
> I came across https://github.com/apache/beam/pull/23200 - there are
> failed tests and I don't see test results for Python PostCommit suites. Do
> you know what's the status of both?
>
> Minor nits: missing substitution in  * Java artifacts were built with
> Gradle GRADLE_VERSION and OpenJDK/Oracle JDK JDK_VERSION.
>
> Thanks!
>
>
>
> On Mon, Oct 3, 2022 at 7:21 AM Ritesh Ghorse via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (non-binding)
>> Validated Go SDK Quickstart on Direct and Dataflow runner
>>
>>
>> On Mon, Oct 3, 2022 at 9:38 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> +1 (binding)
>>>
>>> Tested with  https://github.com/Talend/beam-samples/
>>> (Java SDK v8 & v11, Spark 3 runner).
>>>
>>> ---
>>> Alexey
>>>
>>> On 3 Oct 2022, at 14:32, Chamikara Jayalath via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>> +1 (binding)
>>>
>>> Verified checksums and signatures of artifacts.
>>> Validated some multi-language pipelines.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Thu, Sep 29, 2022 at 6:12 PM Robert Burke via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hi everyone,
 Please review and vote on the release candidate #1 for the version
 2.42.0, as follows:
 [ ] +1, Approve the release
 [ ] -1, Do not approve the release (please provide specific
 comments)

 Reviewers are encouraged to test their own use cases with the
 release candidate, and vote +1 if no issues are found.

 The complete staging area is available for your review, which
 includes:
 * GitHub Release notes [1],
 * the official Apache source release to be deployed to
 dist.apache.org [2], which is signed with the key with fingerprint
 A52F5C83BAE26160120EC25F3D56ACFBFB2975E1 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "v2.42.0-RC1" [5],
 * website pull request listing 

Re: [JmsIO] => Pull Request to fix message acknowledgement issue

2022-09-08 Thread Luke Cwik via dev
Could we have more than one active checkpoint per reader instance?
Yes. Readers are saved and reused across multiple bundles. They aren't
always closed at bundle boundaries.

Are we sure that all checkpoints are finalized when the reader is closed?
No, readers are closed after a certain period of time of inactivity. It is
likely that all checkpoints will have expired or been finalized but it is
not guaranteed by when the reader is closed for example in multi
language pipelines the downstream processing in another language can delay
committing the output to the runner which can lead to the readers being
closed due to inactivity and then the checkpoint being finalized.

We could choose to hand off the session ownership to the JmsCheckpoint and
create a new one. This way finalizing the checkpoint would own closing the
session.




On Thu, Sep 8, 2022 at 8:01 AM BALLADA Vincent 
wrote:

> Hello Luke,
>
>
>
> Thanks for your remarks.
>
>
>
> *Connection reuse*
>
> Concerning the use of a single connection fort the entire process per
> connection factory, that would mean that we would have one JMS connection
> per worker, and there may be a downside to do so:
>
> If the broker is hosted into a multi-node cluster infrastructure, and if
> we want to consumer messages from all cluster nodes, we have to make sure
> that we have enough connections to be load balanced to all the nodes.
>
> If for some reason (autoscaling, low backlog size) we have only one
> worker, we may not consume from all the cluster nodes.
>
> As the number of connections is limited by the number of split/Readers,
> and as connections are opened/closed not so often (when workers are killed
> or created, or reader closes/started), I would suggest to keep the
> connection management as it is currently.
>
>
>
> *Session and consumer lifecycle*
>
>
>
>1. Session unique per checkpoint
>
> Could we have more than one active checkpoint per reader instance?
>
>
>
> Should we close the session/consumer and create new session/consumer at
> the end of finalizeCheckpoint? The goal here is to ensure that the message
> acknowledgement occurs before the session is closed.
>
> If advance and finalizeCheckpoint can be called concurrently, we need to
> make sure that the session is active in “advance” in order to receive
> message.
>
> Are we sure that all checkpoints are finalized when the reader is closed?
>
>
>
>1. Session scoped to the reader start/close
>
> It seems to be more or less the case currently.
>
>
>
> Regards
>
>
>
> Vincent BALLADA
>
>
>
>
>
> *De : *Luke Cwik via dev 
> *Date : *jeudi, 1 septembre 2022 à 18:48
> *À : *dev 
> *Objet : *Re: [JmsIO] => Pull Request to fix message acknowledgement issue
>
> [image: vwP6KQExYeP8ewASUVORK5CYII=]
>
> [EXT]
>
> I have a better understanding of the problem after reviewing the doc and
> we need to decide on what lifecycle scope we want the `Connection`,
> `Session`, and `MessageConsumer` to have.
>
> It looks like for the `Connection` we should try to have at most one
> instance for the entire process per connection factory.
> https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that
> the connection should be re-used. Having less connections would likely be
> beneficial unless you think there would be a performance limitation of
> using a single connection per process for all the messages?
>
> For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope
> it should have. Some ideas:
> 1. we could make it so that each `Session` is unique per checkpoint, e.g.
> we hand off the ownership of the `Session` to the JmsCheckpointMark
> everytime we checkpoint and create a new `Session` for the next set of
> messages we receive. This would mean that we would also close the
> `MessageConsumer` at every checkpoint and create a new one.
> 2. we could make it so that the `Session` is scoped to the reader
> start/close and possibly multiple checkpoint marks and effectively close
> the `Session` once the reader is closed and all checkpoint marks are
> finalized/expired. We would close the `MessageConsumer` whenever the reader
> is closed.
> 3. we could make it so that the `Session` is scoped to the `Connection`
> and would only close it when the `Connection` closes.
>
> 1 seems pretty simple since the ownership of the `Session` is always owned
> by a single distinct owner. This seems like it would make the most sense if
> `Session` creation and management was cheap. Another positive is that once
> the `Session` closes any messages that weren't acknowledged are returned
> back to the queue and we will not have to wait for the reader to be closed
> or all th

Re: Upcoming potentially breaking change to CoGroupByKey

2022-09-06 Thread Luke Cwik via dev
We should send this out to us...@beam.apache.org so that they are aware of
this change once commenting in the doc has settled.

On Tue, Sep 6, 2022 at 1:59 PM Robert Burke  wrote:

> Thank you for already planning to *NOT* have this merged until after this
> week's 2.42.0 cut. This Release Manager is pleased that the doc says it's
> intended for 2.43.0.
>
>
> On Tue, Sep 6, 2022, 1:44 PM Ryan Thompson via dev 
> wrote:
>
>> CoGroupByKey returns a dictionary of {KeyType, List[ValueType]} but, as
>> GroupByKey, should return an Iterable.
>>
>> Change:
>> https://github.com/apache/beam/pull/22984
>>
>> Please look at this doc
>> 
>> if you need more details. Feel free to comment.
>>
>>


Re: [JmsIO] => Pull Request to fix message acknowledgement issue

2022-09-01 Thread Luke Cwik via dev
I have a better understanding of the problem after reviewing the doc and we
need to decide on what lifecycle scope we want the `Connection`, `Session`,
and `MessageConsumer` to have.

It looks like for the `Connection` we should try to have at most one
instance for the entire process per connection factory.
https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that
the connection should be re-used. Having less connections would likely be
beneficial unless you think there would be a performance limitation of
using a single connection per process for all the messages?

For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope
it should have. Some ideas:
1. we could make it so that each `Session` is unique per checkpoint, e.g.
we hand off the ownership of the `Session` to the JmsCheckpointMark
everytime we checkpoint and create a new `Session` for the next set of
messages we receive. This would mean that we would also close the
`MessageConsumer` at every checkpoint and create a new one.
2. we could make it so that the `Session` is scoped to the reader
start/close and possibly multiple checkpoint marks and effectively close
the `Session` once the reader is closed and all checkpoint marks are
finalized/expired. We would close the `MessageConsumer` whenever the reader
is closed.
3. we could make it so that the `Session` is scoped to the `Connection` and
would only close it when the `Connection` closes.

1 seems pretty simple since the ownership of the `Session` is always owned
by a single distinct owner. This seems like it would make the most sense if
`Session` creation and management was cheap. Another positive is that once
the `Session` closes any messages that weren't acknowledged are returned
back to the queue and we will not have to wait for the reader to be closed
or all the checkpoint marks to be finalized.

What do you think?

On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré 
wrote:

> Hi Vincent,
>
> thanks, I will take a look (as original JmsIO author ;)).
>
> Regards
> JB
>
> On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent
>  wrote:
> >
> > Hi all,
> >
> >
> >
> > Here is a PR related to the following issue (Runner acknowledges
> messages on closed session):
> >
> > https://github.com/apache/beam/issues/20814
> >
> >
> >
> > And here is a documentation explaining the fix:
> >
> >
> https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
> >
> >
> >
> > And finally the PR:
> >
> > https://github.com/apache/beam/pull/22932
> >
> >
> >
> > Regards
> >
> >
> >
> > Vincent BALLADA
> >
> >
> >
> >
> >
> >
> > Confidential C
> >
> > -- Disclaimer 
> > Ce message ainsi que les eventuelles pieces jointes constituent une
> correspondance privee et confidentielle a l'attention exclusive du
> destinataire designe ci-dessus. Si vous n'etes pas le destinataire du
> present message ou une personne susceptible de pouvoir le lui delivrer, il
> vous est signifie que toute divulgation, distribution ou copie de cette
> transmission est strictement interdite. Si vous avez recu ce message par
> erreur, nous vous remercions d'en informer l'expediteur par telephone ou de
> lui retourner le present message, puis d'effacer immediatement ce message
> de votre systeme.
> >
> > *** This e-mail and any attachments is a confidential correspondence
> intended only for use of the individual or entity named above. If you are
> not the intended recipient or the agent responsible for delivering the
> message to the intended recipient, you are hereby notified that any
> disclosure, distribution or copying of this communication is strictly
> prohibited. If you have received this communication in error, please notify
> the sender by phone or by replying this message, and then delete this
> message from your system.
>


[RESULT] [VOTE] Vendored Dependencies Release

2022-08-08 Thread Luke Cwik via dev
I'm happy to announce that we have unanimously approved this release.

There are 3 approving votes, 3 of which are binding:
* Luke Cwik
* Pablo Estrada
* Chamikara Jayalath

There are no disapproving votes.

Thanks everyone!

On Mon, Aug 8, 2022 at 9:47 AM Pablo Estrada  wrote:

> +1
> Thanks!
> -P.
>
> On Mon, Aug 8, 2022 at 9:24 AM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> +1
>>
>> Thanks,
>> Cham
>>
>> On Fri, Aug 5, 2022 at 1:49 PM Luke Cwik via dev 
>> wrote:
>>
>>> +1
>>>
>>> I verified the signatures of the artifacts, that the jar doesn't contain
>>> classes outside of the org/apache/beam/vendor/grpc/v1p48p1 package and I
>>> tested the artifact against our precommits using
>>> https://github.com/apache/beam/pull/22595
>>>
>>> On Fri, Aug 5, 2022 at 1:42 PM Luke Cwik  wrote:
>>>
>>>> Please review the release of the following artifacts that we vendor:
>>>>  * beam-vendor-grpc-1_48_1
>>>>
>>>> Hi everyone,
>>>> Please review and vote on the release candidate #1 for the version 0.1,
>>>> as follows:
>>>> [ ] +1, Approve the release
>>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>>
>>>>
>>>> The complete staging area is available for your review, which includes:
>>>> * the official Apache source release to be deployed to dist.apache.org
>>>> [1], which is signed with the key with fingerprint
>>>> EAD5DE293F4A03DD2E77565589E68A56E371CCA2 [2],
>>>> * all artifacts to be deployed to the Maven Central Repository [3],
>>>> * commit hash "db8db0b6ed0fe1e4891f207f0f7f811798e54db1" [4],
>>>>
>>>> The vote will be open for at least 72 hours. It is adopted by majority
>>>> approval, with at least 3 PMC affirmative votes.
>>>>
>>>> Thanks,
>>>> Release Manager
>>>>
>>>> [1] https://dist.apache.org/repos/dist/dev/beam/vendor/
>>>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>>> [3]
>>>> https://repository.apache.org/content/repositories/orgapachebeam-1277/
>>>> [4]
>>>> https://github.com/apache/beam/commit/db8db0b6ed0fe1e4891f207f0f7f811798e54db1
>>>>
>>>


Re: Beam Website Feedback

2022-08-08 Thread Luke Cwik via dev
Thanks.

On Mon, Aug 8, 2022 at 8:12 AM Peter Simon  wrote:

> Awesome web UI
>
> Peter Simon
>
> *Data Scientist*
>
>
>
> e   peter.si...@fanatical.com
>
> w  fanatical.com
>
> Focus Multimedia Limited.
>
> The Studios, Lea Hall Enterprise Park,
>
> Wheelhouse Road, Brereton, Rugeley,
>
> Staffordshire, WS15 1LH, United Kingdom.
>
> 
>
>


Re: Beam gRPC depedency tracing

2022-08-08 Thread Luke Cwik via dev
I think you missed Kenn's earlier reply:
https://lists.apache.org/thread/v0nr6mv0rqhd76ox1bwt6qwo4q3g7w58

The vendored gRPC is built by transforming the released gRPC jar. Here is
where in the Beam git history you can find the source for the
transformation:
https://github.com/apache/beam/tree/40293eb52ca914acbbbae51e4b24fa280f2b44f0/vendor/grpc-1_26_0

Kenn


On Sun, Aug 7, 2022 at 1:16 PM JDW J  wrote:

> Anyone?
>
> On Wed, 27 Jul, 2022, 13:21 JDW J,  wrote:
>
>> Team,
>>
>> Consider me a newbie to Beam and Java world in general. I am trying to
>> trace Beam vendor dependency to gRPC-upstream.
>>
>>   4.0.0
>>   org.apache.beam
>>   beam-vendor-grpc-1_26_0
>>   0.3
>>   Apache Beam :: Vendored Dependencies :: gRPC :: 1.26.0
>>   http://beam.apache.org
>>
>>
>> How can I tell what is the exact upstream repo for "Apache Beam ::
>> Vendored Dependencies :: gRPC :: 1.26.0" ?
>>
>> -joji
>>
>


Re: [VOTE] Vendored Dependencies Release

2022-08-05 Thread Luke Cwik via dev
+1

I verified the signatures of the artifacts, that the jar doesn't contain
classes outside of the org/apache/beam/vendor/grpc/v1p48p1 package and I
tested the artifact against our precommits using
https://github.com/apache/beam/pull/22595

On Fri, Aug 5, 2022 at 1:42 PM Luke Cwik  wrote:

> Please review the release of the following artifacts that we vendor:
>  * beam-vendor-grpc-1_48_1
>
> Hi everyone,
> Please review and vote on the release candidate #1 for the version 0.1, as
> follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * the official Apache source release to be deployed to dist.apache.org
> [1], which is signed with the key with fingerprint
> EAD5DE293F4A03DD2E77565589E68A56E371CCA2 [2],
> * all artifacts to be deployed to the Maven Central Repository [3],
> * commit hash "db8db0b6ed0fe1e4891f207f0f7f811798e54db1" [4],
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Release Manager
>
> [1] https://dist.apache.org/repos/dist/dev/beam/vendor/
> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
> [3] https://repository.apache.org/content/repositories/orgapachebeam-1277/
> [4]
> https://github.com/apache/beam/commit/db8db0b6ed0fe1e4891f207f0f7f811798e54db1
>


[VOTE] Vendored Dependencies Release

2022-08-05 Thread Luke Cwik via dev
Please review the release of the following artifacts that we vendor:
 * beam-vendor-grpc-1_48_1

Hi everyone,
Please review and vote on the release candidate #1 for the version 0.1, as
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* the official Apache source release to be deployed to dist.apache.org [1],
which is signed with the key with fingerprint
EAD5DE293F4A03DD2E77565589E68A56E371CCA2 [2],
* all artifacts to be deployed to the Maven Central Repository [3],
* commit hash "db8db0b6ed0fe1e4891f207f0f7f811798e54db1" [4],

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1] https://dist.apache.org/repos/dist/dev/beam/vendor/
[2] https://dist.apache.org/repos/dist/release/beam/KEYS
[3] https://repository.apache.org/content/repositories/orgapachebeam-1277/
[4]
https://github.com/apache/beam/commit/db8db0b6ed0fe1e4891f207f0f7f811798e54db1


Vendored gRPC update

2022-08-04 Thread Luke Cwik via dev
I was looking to update gRPC that we use to the latest (1.48.1) version to
move off of a vulnerable version of Netty that a user pointed out in
BEAM-14118. This would supersede the work done in
https://github.com/apache/beam/pull/17206 as that PR has stalled.

If there aren't any concerns I'll test an updated artifact and start a
voting thread once the tests complete.


[ANNOUNCE] New committer: Steven Niemitz

2022-07-19 Thread Luke Cwik via dev
Hi all,

Please join me and the rest of the Beam PMC in welcoming a new committer:
Steven Niemitz (sniemitz@)

Steven started contributing to Beam in 2017 fixing bugs and improving
logging and usability. Stevens most recent focus has been on performance
optimizations within the Java SDK.

Considering the time span and number of contributions, the Beam PMC trusts
Steven with the responsibilities of a Beam committer. [1]

Thank you Steven! And we are looking to see more of your contributions!

Luke, on behalf of the Apache Beam PMC

[1] https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-
committer


Re: Fun with WebAssembly transforms

2022-07-14 Thread Luke Cwik via dev
Thanks Cham, I wasn't up to speed as to where Xlang was wrt to those
transforms.

On Wed, Jul 13, 2022 at 9:32 PM Chamikara Jayalath 
wrote:

> +1 and this is exactly what I suggested as well. Python Dataframe,
> RunInference, Python Map are available via x-lang for Java already [1] and
> all three need/use simple UDFs to customize operation. There is some logic
> that needs to be added to use Python transforms from Go SDK. As you
> suggested there are many Java x-lang transforms that can use simple UDF
> support as well. Either language combination should work to implement a
> first proof of concept for WASI support while also addressing an existing
> limitation.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/tree/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms
>
> On Wed, Jul 13, 2022 at 8:26 PM Kenneth Knowles  wrote:
>
>> I agree with Luke. Targeting little helper UDFs that go along with IOs
>> are actually a major feature gap for xlang - like timestamp extractors that
>> have to parse particular data formats. This could be a very useful place to
>> try out the design options. I think we can simplify the problem by
>> insisting that they are pure functions that do not access state or side
>> inputs.
>>
>> On Wed, Jul 13, 2022 at 7:52 PM Luke Cwik via dev 
>> wrote:
>>
>>> I think an easier target would be to support things like
>>> DynamicDestinations for Java IO connectors that are exposed as XLang for
>>> Go/Python <https://goto.google.com/Python>.
>>>
>>> This is because Go/Python <https://goto.google.com/Python> have good
>>> transpiling support to WebAssembly and we already exposed several Java IO
>>> XLang connectors already so its about plumbing one more thing through for
>>> these IO connectors.
>>>
>>> What interface should we expect for UDFs / UDAFs and should they be
>>> purpose oriented or should we do something like we did for portability
>>> where we have a graph of transforms that we feed arbitrary data in/out
>>> from. The latter would have the benefit of allowing the runner to embed the
>>> language execution directly within the runner and would pay the Wasm
>>> communication tax instead of the gRPC communication tax. If we do the
>>> former we still have the same issues where we have to be able to have a
>>> type system to pass information between the host system and the transpiled
>>> WebAssembly code that wraps the users UDF/UDAF and what if the UDF wants
>>> access to side inputs or user state ...
>>>
>>> On Wed, Jul 13, 2022 at 4:09 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Jul 13, 2022 at 9:31 AM Luke Cwik  wrote:
>>>>
>>>>> First we'll want to choose whether we want to target Wasm, WASI or
>>>>> Wagi.
>>>>>
>>>>
>>>> These terms are defined here
>>>> <https://www.fermyon.com/blog/wasm-wasi-wagi?gclid=CjwKCAjw2rmWBhB4EiwAiJ0mtVhiTuMZmy4bJSlk4nJj1deNX3KueomLgkG8JMyGeiHJ3FJRPpVn7BoCs58QAvD_BwE>
>>>> if anybody is confused as I am :)
>>>>
>>>>
>>>>> WASI adds a lot of simple things like access to a clock, random number
>>>>> generator, ... that would expand the scope of what transpiled code can do.
>>>>> It is debatable whether we'll want the power to run the transpiled code as
>>>>> a microservice. Using UDFs for XLang and UDFs and UDAFs for SQL as our
>>>>> expected use cases seem to make WASI the best choice. The issue is in the
>>>>> details as there is a hodgepodge of what language runtimes support and 
>>>>> what
>>>>> are the limits of transpiling from a language to WebAssembly.
>>>>>
>>>>
>>>> Agree that WASI seems like a good target since it gives access to
>>>> additional system resources/tooling.
>>>>
>>>>
>>>>>
>>>>> Assuming WASI then it breaks down to these two aspects:
>>>>> 1) Does the host language have a runtime?
>>>>> Java: https://github.com/wasmerio/wasmer-java
>>>>> Python: https://github.com/wasmerio/wasmer-python
>>>>> Go: https://github.com/wasmerio/wasmer-go
>>>>>
>>>>> 2) How good is compilation from source language to WebAssembly
>>>>> <https://github.com/appcypher/awesome-wasm-langs>?
>>>>> Java (very limited):
>>>>> Is

Re: Fun with WebAssembly transforms

2022-07-13 Thread Luke Cwik via dev
I think an easier target would be to support things like
DynamicDestinations for Java IO connectors that are exposed as XLang for
Go/Python.

This is because Go/Python have good transpiling support to WebAssembly and
we already exposed several Java IO XLang connectors already so its about
plumbing one more thing through for these IO connectors.

What interface should we expect for UDFs / UDAFs and should they be purpose
oriented or should we do something like we did for portability where we
have a graph of transforms that we feed arbitrary data in/out from. The
latter would have the benefit of allowing the runner to embed the language
execution directly within the runner and would pay the Wasm communication
tax instead of the gRPC communication tax. If we do the former we still
have the same issues where we have to be able to have a type system to pass
information between the host system and the transpiled WebAssembly code
that wraps the users UDF/UDAF and what if the UDF wants access to side
inputs or user state ...

On Wed, Jul 13, 2022 at 4:09 PM Chamikara Jayalath 
wrote:

>
>
> On Wed, Jul 13, 2022 at 9:31 AM Luke Cwik  wrote:
>
>> First we'll want to choose whether we want to target Wasm, WASI or Wagi.
>>
>
> These terms are defined here
> <https://www.fermyon.com/blog/wasm-wasi-wagi?gclid=CjwKCAjw2rmWBhB4EiwAiJ0mtVhiTuMZmy4bJSlk4nJj1deNX3KueomLgkG8JMyGeiHJ3FJRPpVn7BoCs58QAvD_BwE>
> if anybody is confused as I am :)
>
>
>> WASI adds a lot of simple things like access to a clock, random number
>> generator, ... that would expand the scope of what transpiled code can do.
>> It is debatable whether we'll want the power to run the transpiled code as
>> a microservice. Using UDFs for XLang and UDFs and UDAFs for SQL as our
>> expected use cases seem to make WASI the best choice. The issue is in the
>> details as there is a hodgepodge of what language runtimes support and what
>> are the limits of transpiling from a language to WebAssembly.
>>
>
> Agree that WASI seems like a good target since it gives access to
> additional system resources/tooling.
>
>
>>
>> Assuming WASI then it breaks down to these two aspects:
>> 1) Does the host language have a runtime?
>> Java: https://github.com/wasmerio/wasmer-java
>> Python: https://github.com/wasmerio/wasmer-python
>> Go: https://github.com/wasmerio/wasmer-go
>>
>> 2) How good is compilation from source language to WebAssembly
>> <https://github.com/appcypher/awesome-wasm-langs>?
>> Java (very limited):
>> Issues with garbage collection and the need to transpile/replace much of
>> the VM's capabilities plus the large standard library that everyone uses
>> causes a lot of challenges.
>> JWebAssembly can do simple things like basic classes, strings, method
>> calls. Should be able to compile trivial lambdas to Wasm. There are other
>> choices but to my knowledge all are very limited.
>>
>
> That's unfortunate. But hopefully Java support will be implemented soon ?
>
>
>>
>> Python <https://pythondev.readthedocs.io/wasm.html> (quite good):
>> Features CPython Emscripten browser CPython Emscripten node Pyodide
>> subprocess (fork, exec) no no no
>> threads no YES WIP
>> file system no (only MEMFS) YES (Node raw FS) YES (IDB, Node, …)
>> shared extension modules WIP WIP YES
>> PyPI packages no no YES
>> sockets ? ? ?
>> urllib, asyncio no no WebAPI fetch / WebSocket
>> signals no WIP YES
>>
>> Go (excellent): Native support in go compiler
>>
>
> Great. Could executing Go UDFs in Python x-lang transforms (for example,
> Dataframe, RunInference, Python Map) be a good first target ?
>
> Thanks,
> Cham
>
>
>>
>> On Tue, Jul 12, 2022 at 5:51 PM Chamikara Jayalath via dev <
>> dev@beam.apache.org> wrote:
>>
>>>
>>>
>>> On Wed, Jun 29, 2022 at 9:31 AM Luke Cwik  wrote:
>>>
>>>> I have had interest in integrating Wasm within Beam as well as I have
>>>> had a lot of interest in improving language portability.
>>>>
>>>> Wasm has a lot of benefits over using docker containers to provide a
>>>> place for code to execute. From experience implementing working on the
>>>> Beam's portability layer and internal Flume knowledge:
>>>> * encoding and decoding data is expensive, anything which ensures that
>>>> in-memory representations for data being transferred from the host to the
>>>> guest and back without transcoding/re-interpreting will be a big win.
>>>> * reducing the amount of times we need to pass data between guest and
>>>> host and back i

Re: Fun with WebAssembly transforms

2022-07-13 Thread Luke Cwik via dev
First we'll want to choose whether we want to target Wasm, WASI or Wagi.
WASI adds a lot of simple things like access to a clock, random number
generator, ... that would expand the scope of what transpiled code can do.
It is debatable whether we'll want the power to run the transpiled code as
a microservice. Using UDFs for XLang and UDFs and UDAFs for SQL as our
expected use cases seem to make WASI the best choice. The issue is in the
details as there is a hodgepodge of what language runtimes support and what
are the limits of transpiling from a language to WebAssembly.

Assuming WASI then it breaks down to these two aspects:
1) Does the host language have a runtime?
Java: https://github.com/wasmerio/wasmer-java
Python: https://github.com/wasmerio/wasmer-python
Go: https://github.com/wasmerio/wasmer-go

2) How good is compilation from source language to WebAssembly
<https://github.com/appcypher/awesome-wasm-langs>?
Java (very limited):
Issues with garbage collection and the need to transpile/replace much of
the VM's capabilities plus the large standard library that everyone uses
causes a lot of challenges.
JWebAssembly can do simple things like basic classes, strings, method
calls. Should be able to compile trivial lambdas to Wasm. There are other
choices but to my knowledge all are very limited.

Python <https://pythondev.readthedocs.io/wasm.html> (quite good):
Features CPython Emscripten browser CPython Emscripten node Pyodide
subprocess (fork, exec) no no no
threads no YES WIP
file system no (only MEMFS) YES (Node raw FS) YES (IDB, Node, …)
shared extension modules WIP WIP YES
PyPI packages no no YES
sockets ? ? ?
urllib, asyncio no no WebAPI fetch / WebSocket
signals no WIP YES

Go (excellent): Native support in go compiler

On Tue, Jul 12, 2022 at 5:51 PM Chamikara Jayalath via dev <
dev@beam.apache.org> wrote:

>
>
> On Wed, Jun 29, 2022 at 9:31 AM Luke Cwik  wrote:
>
>> I have had interest in integrating Wasm within Beam as well as I have had
>> a lot of interest in improving language portability.
>>
>> Wasm has a lot of benefits over using docker containers to provide a
>> place for code to execute. From experience implementing working on the
>> Beam's portability layer and internal Flume knowledge:
>> * encoding and decoding data is expensive, anything which ensures that
>> in-memory representations for data being transferred from the host to the
>> guest and back without transcoding/re-interpreting will be a big win.
>> * reducing the amount of times we need to pass data between guest and
>> host and back is important
>>   * fusing transforms reduces the number of data passing points
>>   * batching (row or columnar) data reduces the amount of times we need
>> to pass data at each data passing point
>> * there are enough complicated use cases (state & timers, large
>> iterables, side inputs) where handling the trivial map/flatmap usecase will
>> provide little value since it will prevent fusion
>>
>> I have been meaning to work on a prototype where we replace the current
>> gRPC + docker path with one in which we use Wasm to execute a fused graph
>> re-using large parts of the existing code base written to support
>> portability.
>>
>
> This sounds very interesting. Probably using Wasm to implement proper UDF
> support for x-lang (for example, executing Python timestamp/watermark
> functions provided through the Kafka Python x-lang wrapper on the Java
> Kafka transform) will be a good first target ? My main question for this at
> this point is whether Wasm has adequate support for existing SDKs that use
> x-lang to implement this in a useful way.
>
> Thanks,
> Cham
>
>
>>
>>
>> On Fri, Jun 17, 2022 at 2:19 PM Brian Hulette 
>> wrote:
>>
>>> Re: Arrow - it's long been my dream to use Arrow for interchange in Beam
>>> [1]. I'm trying to move us in that direction with
>>> https://s.apache.org/batched-dofns (arrow is discussed briefly in the
>>> Future Work section). This gives the Python SDK a concept of batches of
>>> logical elements. My goal is Beam schemas + batches of logical elements ->
>>> Arrow RecordBatches.
>>>
>>> The Batched DoFn infrastructure is stable as of the 2.40.0 release cut
>>> and I'm currently working on adding what I'm calling a "BatchConverter" [2]
>>> for Beam Rows -> Arrow RecordBatch. Once that's done it could be
>>> interesting to experiment with a "WasmDoFn" that uses Arrow for interchange.
>>>
>>> Brian
>>>
>>> [1]
>>> https://docs.google.com/presentation/d/1D9vigwYTCuAuz_CO8nex3GK3h873acmQJE5Ui8TFsDY/edit#slide=id.g608e662464_0_160
>>> [2]
>>> h

Re: [RFC] Gather JMH performance metrics in Beam community-metrics

2022-07-12 Thread Luke Cwik via dev
This sounds great.

Since every language has a benchmarking tool, we can start with JMH and
expand from there.

A key point is that we will want to dedicate a Jenkins machine exclusively
to this when the microbenchmarks are running, otherwise we will have other
competing Jenkins jobs using up CPU that will make the benchmarks really
noisy. Based upon the number of microbenchmarks that exist and are in the
works it looks like we will have about 4 hours of microbenchmarks.

On Tue, Jul 12, 2022 at 8:52 AM Moritz Mack  wrote:

> Sorry Robert, I should have mentioned … that’s the Java Microbenchmark
> Harness.
>
> So this is all about the Java SDK. Currently there’s benchmarks for the
> fn-harness and some benchmarks for core are in progress.
>
>
>
> Cheers
>
>
>
> On 12.07.22, 17:50, "Robert Burke"  wrote:
>
>
>
> I'm all for additional performance benchmarks! But what does JMH stand
> for? On Tue, Jul 12, 2022, 7:54 AM Moritz Mack  wrote:
> Hi all,   This is a very short proposal to start running JMH benchmarks
> periodically and
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an External Sender *
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> I'm all for additional performance benchmarks!
>
>
>
> But what does JMH stand for?
>
>
>
> On Tue, Jul 12, 2022, 7:54 AM Moritz Mack  wrote:
>
> Hi all,
>
>
>
> This is a very short proposal to start running JMH benchmarks periodically
> and store benchmark results so we can start monitoring performance trends
> on the community metrics dashboards over time. Comments most welcome!
>
>
>
> https://s.apache.org/nvi9g
> 
>
>
>
> Best regards,
>
> Moritz
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> *
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> *
>
>
>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Luke Cwik via dev
I was suggesting GCP support mainly because I don't think you want to share
the 2.36 and 2.40 version of your job file publicly as someone familiar
with the layout and format may spot a meaningful difference.

Also, if it turns out that there is no meaningful difference between the
two then the internal mechanics of how the graph is modified by Dataflow is
not surfaced back to you in enough depth to debug further.



On Fri, Jul 8, 2022 at 6:12 AM Evan Galpin  wrote:

> Thanks for your response Luke :-)
>
> Updating in 2.36.0 works as expected, but as you alluded to I'm attempting
> to update to the latest SDK; in this case there are no code changes in the
> user code, only the SDK version.  Is GCP support the only tool when it
> comes to deciphering the steps added by Dataflow?  I would love to be able
> to inspect the complete graph with those extra steps like
> "Unzipped-2/FlattenReplace" that aren't in the job file.
>
> Thanks,
> Evan
>
> On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
> wrote:
>
>> Does doing a pipeline update in 2.36 work or do you want to do an update
>> to get the latest version?
>>
>> Feel free to share the job files with GCP support. It could be something
>> internal but the coders for ephemeral steps that Dataflow adds are based
>> upon existing coders within the graph.
>>
>> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>>
>>> +dev@
>>>
>>> Reviving this thread as it has hit me again on Dataflow.  I am trying to
>>> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
>>> received an error that the step "Flatten.pCollections" was missing from the
>>> new job graph.  I knew from the code that that wasn't true, so I dumped the
>>> job file via "--dataflowJobFile" for both the running pipeline and for the
>>> new version I'm attempting to update to.  Both job files showed identical
>>> data for the Flatten.pCollections step, which raises the question of why
>>> that would have been reported as missing.
>>>
>>> Out of curiosity I then tried mapping the step to the same name, which
>>> changed the error to:  "The Coder or type for step
>>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>>> job files show identical coders for the Flatten step (though
>>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>>> changed.
>>>
>>> I'm not sure how to proceed in updating the running pipeline, and I'd
>>> really prefer not to drain.  Any ideas?
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>>> wrote:
>>>
>>>> Thanks for the ideas Luke. I checked out the json graphs as per your
>>>> recommendation (thanks for that, was previously unaware), and the
>>>> "output_info" was identical for both the running pipeline and the pipeline
>>>> I was hoping to update it with.  I ended up opting to just drain and submit
>>>> the updated pipeline as a new job.  Thanks for the tips!
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>>>
>>>>> I would suggest dumping the JSON representation (with the
>>>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>>>> graph representation is a bipartite graph where there are transform nodes
>>>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>>>> The PCollection nodes typically end with the suffix ".out". This could 
>>>>> help
>>>>> find steps that have been added/removed/renamed.
>>>>>
>>>>> The PipelineDotRenderer[1] might be of use as well.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>>>
>>>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm looking for any help regarding updating streaming jobs which are
>>>>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>>>>> situations where Fusion is involved, and trying to decipher which old 
>>>>>> steps
>>>>>> should be mapped to which new steps.
>>>>>>
>>>>>> I have a case where I updated the steps which come after the step in
>>>>>> question, but when I attempt to update there is an error that "
>>>>>> no longer produces data to the steps ". I believe that
>>>>>>  is only changed as a result of fusion, and in reality it does 
>>>>>> in
>>>>>> fact produce data to  (confirmed when deployed as a new
>>>>>> job for testing purposes).
>>>>>>
>>>>>> Is there a guide for how to deal with updates and fusion?
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-06 Thread Luke Cwik via dev
Does doing a pipeline update in 2.36 work or do you want to do an update to
get the latest version?

Feel free to share the job files with GCP support. It could be something
internal but the coders for ephemeral steps that Dataflow adds are based
upon existing coders within the graph.

On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:

> +dev@
>
> Reviving this thread as it has hit me again on Dataflow.  I am trying to
> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
> received an error that the step "Flatten.pCollections" was missing from the
> new job graph.  I knew from the code that that wasn't true, so I dumped the
> job file via "--dataflowJobFile" for both the running pipeline and for the
> new version I'm attempting to update to.  Both job files showed identical
> data for the Flatten.pCollections step, which raises the question of why
> that would have been reported as missing.
>
> Out of curiosity I then tried mapping the step to the same name, which
> changed the error to:  "The Coder or type for step
> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
> job files show identical coders for the Flatten step (though
> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
> internal Dataflow thing?), so I'm confident that the coder hasn't actually
> changed.
>
> I'm not sure how to proceed in updating the running pipeline, and I'd
> really prefer not to drain.  Any ideas?
>
> Thanks,
> Evan
>
>
> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin  wrote:
>
>> Thanks for the ideas Luke. I checked out the json graphs as per your
>> recommendation (thanks for that, was previously unaware), and the
>> "output_info" was identical for both the running pipeline and the pipeline
>> I was hoping to update it with.  I ended up opting to just drain and submit
>> the updated pipeline as a new job.  Thanks for the tips!
>>
>> Thanks,
>> Evan
>>
>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>
>>> I would suggest dumping the JSON representation (with the
>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>> graph representation is a bipartite graph where there are transform nodes
>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>> The PCollection nodes typically end with the suffix ".out". This could help
>>> find steps that have been added/removed/renamed.
>>>
>>> The PipelineDotRenderer[1] might be of use as well.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>
>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm looking for any help regarding updating streaming jobs which are
>>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>>> situations where Fusion is involved, and trying to decipher which old steps
>>>> should be mapped to which new steps.
>>>>
>>>> I have a case where I updated the steps which come after the step in
>>>> question, but when I attempt to update there is an error that "
>>>> no longer produces data to the steps ". I believe that
>>>>  is only changed as a result of fusion, and in reality it does in
>>>> fact produce data to  (confirmed when deployed as a new
>>>> job for testing purposes).
>>>>
>>>> Is there a guide for how to deal with updates and fusion?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>


Re: Force SDF to run on every task the JAR is loaded on?

2021-10-07 Thread Luke Cwik
On Thu, Oct 7, 2021 at 10:07 AM Daniel Collins  wrote:

> Hi Luke,
>
> The whole point here is actually to write to Pubsub on the other side of
> the PTransform, so what you suggested was exactly my intent. Although you
> could conceivably publish directly to Pub/Sub within the SDF, this is not
> super extensible.
>
> > once the client retries you'll publish a duplicate
>
> This is acceptable, since the sink is probably Pubsub anyway. The client
> could conceivably attach a deduplication ID to the source requests before
> sending them.
>
> > Does the order in which the requests you get from a client or across
> clients matter?
>
> No, not at all, since the sink would be Pubsub (which, given beam doesn't
> support ordering internally or ordering keys, I wouldn't be concerned with).
>
> I think the main question is still outstanding though, is there a way to
> ensure that on all tasks the pipeline JAR is loaded on, it actually will
> run to avoid stranding user messages?
>
>
Not that I'm aware of.


> -Dan
>
> On Thu, Oct 7, 2021 at 12:53 PM Luke Cwik  wrote:
>
>> I would suggest that you instead write the requests received within the
>> splittable DoFn directly to a queue based sink and in another part of the
>> pipeline read from that queue. For example if you were using Pubsub for the
>> queue, your pipeline would look like:
>> Create(LB config + pubsub topic A) -> ParDo(SDF get request from client
>> and write to pubsub and then ack client)
>> Pubsub(Read from A) -> ?Deduplicate? -> ... downstream processing ...
>> Since the SDF will write to pubsub before it acknowledges the message you
>> may write data that is not acked and once the client retries you'll publish
>> a duplicate. If downstream processing is not resilient to duplicates then
>> you'll want to have some unique piece of information to deduplicate on.
>>
>> Does the order in which the requests you get from a client or across
>> clients matter?
>> If yes, then you need to be aware that the parallel processing will
>> impact the order in which you see things and you might need to have data
>> sorted/ordered within the pipeline.
>>
>>
>>
>> On Wed, Oct 6, 2021 at 3:56 PM Daniel Collins 
>> wrote:
>>
>>> Hi all,
>>>
>>> Bear with me, this is a bit of a weird one. I've been toying around with
>>> an idea to do http ingestion using a beam (specifically dataflow) pipeline.
>>> The concept would be that you spin up an HTTP server on each running task
>>> with a well known port as a static member of some class in the JAR (or upon
>>> initialization of a SDF the first time), then accept requests, but don't
>>> acknowledge them back to the client until the bundle finalizer
>>> <https://javadoc.io/static/org.apache.beam/beam-sdks-java-core/2.29.0/org/apache/beam/sdk/transforms/DoFn.BundleFinalizer.html>
>>>  so
>>> you know they're persisted/ have moved down the pipeline. You could then
>>> use a load balancer pointed at the instance group created by dataflow as
>>> the target for incoming requests, and create a PCollection from incoming
>>> user requests.
>>>
>>> The only part of this I don't think would work is preventing user
>>> requests from being stranded on a server that will never run the SDF that
>>> will complete them due to load balancing constraints. So my question is: is
>>> there a way to force an SDF to be run on every task where the JAR is loaded?
>>>
>>> Thanks!
>>>
>>> -Dan
>>>
>>


Re: Doubts on KafkaIO/SourceIO

2021-09-03 Thread Luke Cwik
https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass
is out of date and at the top says
IMPORTANT: Use Splittable DoFn to develop your new I/O. For more details,
read the new I/O connector overview.

On Fri, Sep 3, 2021 at 9:55 AM Alexey Romanenko 
wrote:

> Hi Marco,
>
> I tried to answer your questions and I also CC’ed Boyuan Zhang as initial
> author of SDF-based Read implementation for KafkaIO.
>
> Also, I’d recommend to take a look on related PR’s discussion [1] which
> perhaps can give more details of some internal decisions.
>
> Please, see my answers inline.
>
> On 1 Sep 2021, at 18:13, Marco Robles  wrote:
>
>
> I am taking KafkaIO as an example for the PulsarIO connector, during the
> development of the new IO, I got some questions on KafkaIO implementation.
> I was wondering if anyone has some experience with KafkaIO SDF
> implementation that might help me.
>
> - What was taken into consideration to implement the KafkaSourceDescriptor
> 
> which is used as input for the SDF in Kafka?
>
>
> IIRC, this class represents a Kafka topic partition that is used after
> in ReadFromKafkaDoFn to actually read data. So, we can have a
> PCollection to read them in parallel.
>

> - In the ReadFromKafkaDoFn
> 
> class, you have to implement a getSize in order to estimate how much work
> it will take. What approach do you take in order to get an estimate with an
> unbounded approach like kafka?
>
>
> It should be quite tricky to do with unbounded sources, so we try to
> estimate the size by the number of records for current offset in topic
> partition and average record size, based on collected statistics (if any).
>

> - For the SDF implementation, I suppose it will need a Source Interface
> implementation
> 
>  and
> a Reader subclass
> ?
> The documentation is kind of confusing in that part when you are working
> with SDF, Should it be treated as Unbounded for the source/reading part?
>
>
> Well, it’s actually opposite - there are two types for Read implementation
> in Beam:
> - based on Source interface, that you mentioned before (deprecated one);
> - based on Splittable DoFn [2], which is a way that one should use
> (especially for unbounded sources) for new IO connectors.
>
>
>
> [1] https://github.com/apache/beam/pull/11749
> [2] https://beam.apache.org/documentation/io/developing-io-overview/
>
>
> —
> Alexey
>
>


Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

2021-06-23 Thread Luke Cwik
SDF isn't required as users already try to do things like this using
UnboundedSource and Pubsub.

On Wed, Jun 23, 2021 at 11:39 AM Reuven Lax  wrote:

> This was explored in the past, though the design started getting very
> complex (watermarks of unbounded dimension, where each iteration has its
> own watermark dimension). At the time, the exploration petered out.
>
> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> I'd like to discuss a very rough idea. I didn't walk through all the
>> corner cases and the whole idea has a lot of rough edges, so please bear
>> with me. I was thinking about non-IO applications of splittable DoFn,
>> and the main idea - and why it is called splittable - is that it can
>> handle unbounded outputs per element. Then I was thinking about what can
>> generate unbounded outputs per element _without reading from external
>> source_ (as that would be IO application) - and then I realized that the
>> data can - at least theoretically - come from a downstream transform. It
>> would have to be passed over an RPC (gRPC probably) connection, it would
>> probably require some sort of service discovery - as the feedback loop
>> would have to be correctly targeted based on key - and so on (those are
>> the rough edges).
>>
>> But supposing this can be solved - what iterations actually mean is the
>> we have a side channel, that come from downstream processing - and we
>> need a watermark estimator for this channel, that is able to hold the
>> watermark back until the very last element (at a certain watermark)
>> finishes the iteration. The idea is then we could - in theory - create
>> an Iteration PTransform, that would take another PTransform (probably
>> something like PTransform>, PCollection> IterationResult>>, where the IterationResult would contain
>> the original KV and a stopping condition (true, false) and by
>> creating the feedback loop from the output of this PCollection we could
>> actually implement this without any need of support on the side of
>> runners.
>>
>> Does that seem like something that might be worth exploring?
>>
>>   Jan
>>
>>


Re: [Proposal] Support State Batching and Prefetching over FnApi

2021-06-14 Thread Luke Cwik
Enhancements to the SDK which allow for greater declaration of intent would
be useful but the overall issue is that the SDK can send multiple read
requests through the use of readLater() and then block on read() without
the runner being aware that the SDK is blocked.

The runner could implement these strategies today:
* have X pending state lookups, gather all new incoming state requests into
one batch and as soon as one of the pending state lookups has finished then
issue the new batch. This likely will increase load on the state lookup
system since if there are two readLater()'s per element that are back to
back then those won't get batched together
* gather incoming state requests until there are X requests or Y time has
passed. This is strictly slower since we will have to wait till Y time has
passed pretty regularly.

On Mon, Jun 14, 2021 at 3:05 PM Kenneth Knowles  wrote:

> I didn't see user API or SDK changes that I would expect in this proposal.
> Maybe I missed it? The main big win for state batching in the runners core
> trigger & window implementation is batching requests across a whole bundle.
> Certainly across elements. This probably requires something like either:
>
>  - a user API change (to have something like this:
> https://github.com/apache/beam/blob/f47a9424723e20abf807098dd6e9eef6e74c16cc/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java#L52
> )
>  - the SDK to use metadata/annotations (like this:
> https://github.com/apache/beam/blob/f47a9424723e20abf807098dd6e9eef6e74c16cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L420)
> instead
>
> That will allow the hints to be gathered across multiple elements. And it
> would see the Fn API streaming protocol might mean the implementation is
> different than it is in ReduceFnRunner.
>
> Kenn
>
> On Mon, Jun 14, 2021 at 2:46 PM Luke Cwik  wrote:
>
>> The third approach prevents you from batching across state keys which
>> would be the most common type of batching.
>>
>> On Thu, May 6, 2021 at 3:13 PM Rui Wang  wrote:
>>
>>> At this moment, the third approach in the doc is preferred. To recap,
>>> the third approach is the one that only changes FnApi by adding a repeated
>>> field in the state request to support batching over FnApi.
>>>
>>> This approach has the following benefits:
>>> 1. Avoid double requests problem introduced by prefetching (prefetching
>>> needs two requests, one for prefetch and one for blocking fetch).
>>> 2. This approach does not conflict with prefetching so no backward
>>> compatibility issue even when we want to add prefetching in FnApi. So this
>>> approach can be a good starting point.
>>>
>>> The caveat though is this approach does not support smart prefetching
>>> (which needs runners support). However we can add that in the future if
>>> necessary and that won't conflict with existing design.
>>>
>>> Please let us know if you have any objection before the implementation.
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Mar 22, 2021 at 12:27 PM Rui Wang  wrote:
>>>
>>>> Hi Community,
>>>>
>>>> Andrew Crites and I drafted a document to discuss how to support state
>>>> prefetching and batching over FnApi, which seems a missing functionality in
>>>> FnApi. This will help us support Java state readLater() Api over FnApi.
>>>>
>>>> Please see:
>>>> https://docs.google.com/document/d/1Z3a5YOZyYsN8MeS6hRhCXX31m9bKCXSOtjKSl7wX40c/edit?usp=sharing=0-eiNl525kmb3Av2bqgCsZUA
>>>>
>>>>
>>>> -Rui
>>>>
>>>


Re: [Proposal] Support State Batching and Prefetching over FnApi

2021-06-14 Thread Luke Cwik
The third approach prevents you from batching across state keys which would
be the most common type of batching.

On Thu, May 6, 2021 at 3:13 PM Rui Wang  wrote:

> At this moment, the third approach in the doc is preferred. To recap, the
> third approach is the one that only changes FnApi by adding a repeated
> field in the state request to support batching over FnApi.
>
> This approach has the following benefits:
> 1. Avoid double requests problem introduced by prefetching (prefetching
> needs two requests, one for prefetch and one for blocking fetch).
> 2. This approach does not conflict with prefetching so no backward
> compatibility issue even when we want to add prefetching in FnApi. So this
> approach can be a good starting point.
>
> The caveat though is this approach does not support smart prefetching
> (which needs runners support). However we can add that in the future if
> necessary and that won't conflict with existing design.
>
> Please let us know if you have any objection before the implementation.
>
>
> -Rui
>
> On Mon, Mar 22, 2021 at 12:27 PM Rui Wang  wrote:
>
>> Hi Community,
>>
>> Andrew Crites and I drafted a document to discuss how to support state
>> prefetching and batching over FnApi, which seems a missing functionality in
>> FnApi. This will help us support Java state readLater() Api over FnApi.
>>
>> Please see:
>> https://docs.google.com/document/d/1Z3a5YOZyYsN8MeS6hRhCXX31m9bKCXSOtjKSl7wX40c/edit?usp=sharing=0-eiNl525kmb3Av2bqgCsZUA
>>
>>
>> -Rui
>>
>


Re: Removing deprecated oauth2client dependency for Python SDK

2021-06-10 Thread Luke Cwik
I did something very similar during the Dataflow Java 1.x to Beam Java 2.x
migration. The work boiled down to:
* swapping to a different library to get the application default
credentials (including fixing upstream bugs at Google and improving some
documentation)
* swapping existing API calls to use the new credentials object (was easy
since there was a trivial wrapper object that allowed you to convert new
credentials object into the old type that some API client libraries only
supported)
* a bunch of documentation and trivial plumbing issues

On Fri, May 14, 2021 at 5:33 PM Ahmet Altay  wrote:

> +Valentyn Tymofieiev  might have an idea.
>
> On Mon, May 3, 2021 at 4:12 PM Chuck Yang 
> wrote:
>
>> Hi Beam devs,
>>
>> I saw there has been some previous discussion [1][2] around removing
>> the deprecated oauth2client dependency and using the supported
>> google-auth dependency instead. A portion of this work seems to
>> involve migrating off of google-apitools since this call [3] is not
>> supported by credentials objects emitted by google-auth.
>>
>> Does anyone have any experience/insights on how much work migrating
>> off of oauth2client would involve? I might be able to help out but
>> wanted to see a) if anyone is already looking at this and b) if there
>> are any hidden obstacles beyond needing to move from google-apitools
>> to the google-cloud-* libraries. Any pointers are appreciated!
>>
>> We're interested in this migration because of the need to use custom
>> token URIs for issuing service account tokens--it's supported by
>> google-auth but not oauth2client.
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-7352
>> [2] https://github.com/google/apitools/issues/225#issuecomment-434884589
>> [3]
>> https://github.com/google/apitools/blob/v0.5.31/apitools/base/py/base_api.py#L266
>>
>> Thanks!
>> Chuck
>>
>> --
>>
>>
>> *Confidentiality Note:* We care about protecting our proprietary
>> information, confidential material, and trade secrets. This message may
>> contain some or all of those things. Cruise will suffer material harm if
>> anyone other than the intended recipient disseminates or takes any action
>> based on this message. If you have received this message (including any
>> attachments) in error, please delete it immediately and notify the sender
>> promptly.
>>
>


Re: beam new feature

2021-06-08 Thread Luke Cwik
Thanks, I left a few comments in the doc.

On Tue, Jun 8, 2021 at 12:26 PM Daria Malkova 
wrote:

> Hi community!
>
> I've noticed that there is no possibility in Beam JDBC to use partitioning
> for reading a very large table with millions of rows in parallel (for
> example when migrating legacy database data to BigQuery).
> I have some ideas which are decribed here in more detailes:
>
> https://docs.google.com/document/d/1wBzVhQEhTK23ALzTSZ_CVouEOXTm3w2-LjmO3ieUvFc/edit?usp=sharing
> I would like to start working on the related task I've created
> https://issues.apache.org/jira/browse/BEAM-12456
> If anybody have any concerns or proposals please feel free to leave
> comments at google doc.
>
> Thank you,
> Daria
>
>


Re: [DISCUSS] Sensible dependency upgrades

2020-10-23 Thread Luke Cwik
An additional thing I forgot to mention was that if we only had portable
runners our BOM story would be simplified since we wouldn't have the runner
on the classpath and users would have a consistent experience across
runners with regards to dependency convergence.

On Fri, Oct 23, 2020 at 6:15 AM Piotr Szuberski 
wrote:

> Thank you for pointing it out. The awareness problem fits me well here - I
> have a good lesson to discuss things on the devlist.
>
> About SolrIO - I'll create a thread on @users to discuss which versions
> should be supported and make relevant changes after getting a conclusion.
>
> On 2020/10/22 14:24:45, Ismaël Mejía  wrote:
> > I have seen ongoing work on upgrading dependencies, this is a great task
> needed
> > for the health of the project and its IO connectors, however I am a bit
> worried
> > on the impact of these on existing users. We should be aware that we
> support old
> > versions of the clients for valid reasons. If we update a version of a
> client we
> > should ensure that it still interacts correctly with existing users and
> runtime
> > systems. Basically we need two conditions:
> >
> > 1. We cannot update dependencies without considering the current use of
> them.
> > 2. We must avoid upgrading to a non-stable or non-LTS dependency version
> >
> > For (1) in a recent thread Piotr brang some issues about updating Hadoop
> > dependencies to version 3. This surprised me because the whole Big Data
> > ecosystem is just catching up with Hadoop 3  (Flink does not even release
> > artifacts for this yet, and Spark just started on version 3 some months
> ago),
> > which means that most of our users still need that we guarantee
> compatiblity
> > with Hadoop 2.x dependencies.
> >
> > The Hadoop dependencies are mostly 'provided' so a way to achieve this
> is by
> > creating new test configurations that guarantees backwards (or forwards)
> > compatibility by providing the respective versions. This is similar to
> what we
> > do currently in KafkaIO by using by default version 1.0.0 but testing
> > compatibility with 2.1.0 by providing the right dependencies too.
> >
> > The same thread discusses also upgrading to version 3.3.x the latest,
> but per
> > (2) we should not consider upgrades to non stable versions which of
> Hadoop  is
> > currently 3.2.1.  https://hadoop.apache.org/docs/stable/
> >
> > I also saw a recent upgrade of SolrIO to version 8 which may affect some
> users
> > of previous versions with no discussion about it on the mailing lists
> and no
> > backwards compatibility guarantees.
> > https://github.com/apache/beam/pull/13027
> >
> > In the Solr case I think probably this update makes more sense since
> Solr 5.x
> > is deprecated and less people would be probably impacted but still it
> would
> > have been good to discuss this on user@
> >
> > I don't know how we can find a good equilibrium between deciding on those
> > upgrades from maintainers vs users without adding much overhead. Should
> we have
> > a VOTE maybe for the most sensible dependencies? or just assume this is a
> > criteria for the maintainers, I am afraid we may end up with
> > incompatible changes
> > due to the lack of awareness or for not much in return but at the same
> > time I wonder if it makes sense to add the extra work of discussion
> > for minor dependencies where this matters less.
> >
> > Should we document maybe the sensible dependency upgrades (the recent
> > thread on Avro upgrade comes to my mind too)? Or should we have the same
> > criteria for all.  Other ideas?
> >
>


Re: [DISCUSS] Sensible dependency upgrades

2020-10-22 Thread Luke Cwik
Traditionally I have been pushing for as many versions of deps to use the
same version across all the Beam modules (the purpose of the list of deps
in BeamModulePlugin.groovy) to simplify dependency convergence.

One solution is to test and publish BOMs for the various common platform
configurations.
e.g.
Beam + Spark + Hadoop2 + ...
Beam + Dataflow + GCP + ...

Then users can choose which BOM they want and anything outside the set of
BOMs that we supply is up to them to figure out how to support which is
what they are forced to do now.

Using multiple BOMs allows us to work around restrictions that a certain
platform choice has (e.g. if Spark is incompatible with Guava 29, then we
publish a BOM with deps compatible with a version it is compatible with).

Doing this requires us:
* to test the BOMs for the configuration they are expected to work with
(more Jenkins runs)
* has us maintain multiple version lists
* forces us to write code which is compatible across multiple versions of a
dependency and not just a single point version (similar to the work done in
Kafka).

On Thu, Oct 22, 2020 at 7:25 AM Ismaël Mejía  wrote:

> I have seen ongoing work on upgrading dependencies, this is a great task
> needed
> for the health of the project and its IO connectors, however I am a bit
> worried
> on the impact of these on existing users. We should be aware that we
> support old
> versions of the clients for valid reasons. If we update a version of a
> client we
> should ensure that it still interacts correctly with existing users and
> runtime
> systems. Basically we need two conditions:
>
> 1. We cannot update dependencies without considering the current use of
> them.
> 2. We must avoid upgrading to a non-stable or non-LTS dependency version
>
> For (1) in a recent thread Piotr brang some issues about updating Hadoop
> dependencies to version 3. This surprised me because the whole Big Data
> ecosystem is just catching up with Hadoop 3  (Flink does not even release
> artifacts for this yet, and Spark just started on version 3 some months
> ago),
> which means that most of our users still need that we guarantee
> compatiblity
> with Hadoop 2.x dependencies.
>
> The Hadoop dependencies are mostly 'provided' so a way to achieve this is
> by
> creating new test configurations that guarantees backwards (or forwards)
> compatibility by providing the respective versions. This is similar to
> what we
> do currently in KafkaIO by using by default version 1.0.0 but testing
> compatibility with 2.1.0 by providing the right dependencies too.
>
> The same thread discusses also upgrading to version 3.3.x the latest, but
> per
> (2) we should not consider upgrades to non stable versions which of
> Hadoop  is
> currently 3.2.1.  https://hadoop.apache.org/docs/stable/
>
> I also saw a recent upgrade of SolrIO to version 8 which may affect some
> users
> of previous versions with no discussion about it on the mailing lists and
> no
> backwards compatibility guarantees.
> https://github.com/apache/beam/pull/13027
>
> In the Solr case I think probably this update makes more sense since Solr
> 5.x
> is deprecated and less people would be probably impacted but still it would
> have been good to discuss this on user@
>
> I don't know how we can find a good equilibrium between deciding on those
> upgrades from maintainers vs users without adding much overhead. Should we
> have
> a VOTE maybe for the most sensible dependencies? or just assume this is a
> criteria for the maintainers, I am afraid we may end up with
> incompatible changes
> due to the lack of awareness or for not much in return but at the same
> time I wonder if it makes sense to add the extra work of discussion
> for minor dependencies where this matters less.
>
> Should we document maybe the sensible dependency upgrades (the recent
> thread on Avro upgrade comes to my mind too)? Or should we have the same
> criteria for all.  Other ideas?
>


Re: Contributor permissions for Beam Jira - tomasz.szerszen

2020-10-21 Thread Luke Cwik
Welcome to the community. I have added you as a contributor.

Please take a look at our contribution guide[1] if you haven't already done
so.

1: https://beam.apache.org/contribute/

On Wed, Oct 21, 2020 at 9:32 AM Tomasz Szerszeń 
wrote:

> Hello,
>
> I'm looking forward to contribute to Beam project. Could I get permissions
> to add/assign tickets on the Beam Jira?
>
> My Jira account is: tomasz.szerszen
>
> --
>
> Pozdrawiam/Best regards,
>
> Tomasz Szerszeń
> Polidea  | Software Engineer
>
> M: +48 7 <+48793787241>21 527 620
> E: tomasz.szers...@polidea.com 
>
> Unique Tech
> Check out our projects! 
>


Re: Apache Beam case studies

2020-10-21 Thread Luke Cwik
+Mariann Nagy  has been doing things like this for years
now and may be interested.

On Wed, Oct 21, 2020 at 12:50 AM Karolina Rosół 
wrote:

> Hi folks,
>
> With some people from Polidea we came up with an idea to carry out
> interviews with Apache Beam users to spread the news about the Beam model
> and engage more people to use it.
>
> Ideally, we'd set up an online meeting with interested people and then do
> an interview. We'd like to ask questions such as 'how did you find out
> about Apache Beam' / 'how do you use Apache Beam in your company/product?'
> etc. We'd love to post the whole interview on Polidea and Apache Beam
> website.
>
> If any of you is interested, please let me know in this thread :-)
>
> Wish you all a happy week and stay safe!
>
> Karolina Rosół
> Polidea  | Head of Cloud & OSS
>
> M: +48 606 630 236 <+48606630236>
> E: karolina.ro...@polidea.com
> [image: Polidea] 
>
> Check out our projects! 
> [image: Github]  [image: Facebook]
>  [image: Twitter]
>  [image: Linkedin]
>  [image: Instagram]
>  [image: Behance]
>  [image: dribbble]
> 
>


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-10-19 Thread Luke Cwik
+Rose Nguyen  suggested that instead of just a blog,
we should add the majority of the current blog's content to the core
programming guide and either drop the blog and/or have a much smaller blog
that links to the docs.

I think this is a great idea, what do others think?

On Wed, Oct 14, 2020 at 10:51 AM Luke Cwik  wrote:

> Thanks Alexey, that is correct.
>
> On Wed, Oct 14, 2020 at 10:33 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> Thanks Luke, just I guess that the proper link should be this one:
>>
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>
>> On 13 Oct 2020, at 00:23, Luke Cwik  wrote:
>>
>> I have a draft[1] off the blog ready. Please take a look.
>>
>> 1:
>> http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo
>>
>> On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik  wrote:
>>
>>>
>>>
>>> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles  wrote:
>>>
>>>>
>>>>
>>>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik  wrote:
>>>>
>>>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>>>>> use SDF powered Read transforms. Users can opt-out
>>>>> with --experiments=use_deprecated_read.
>>>>>
>>>>
>>>> Huzzah! In our release notes maybe be clear about the expectations for
>>>> users:
>>>>
>>>> Done in https://github.com/apache/beam/pull/13015
>>>
>>>
>>>>  - semantics are expected to be the same: file bugs for any change in
>>>> results
>>>>  - perf may vary: file bugs or write to user@
>>>>
>>>> I was unable to get Spark done for 2.25 as I found out that Spark
>>>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>>>> the watermark system in Spark I could use some guidance here as I believe 
>>>>> I
>>>>> have a version of unbounded SDF support written for Spark (I get all the
>>>>> expected output from tests, just that watermarks aren't being held back so
>>>>> PAssert fails).
>>>>>
>>>>
>>>> Spark's watermarks are not comparable to Beam's. The rule as I
>>>> understand it is that any data that is later than `max(seen timestamps) -
>>>> allowedLateness` is dropped. One difference is that dropping is relative to
>>>> the watermark instead of expiring windows, like early versions of Beam. The
>>>> other difference is that it track the latest event (some call it a "high
>>>> water mark" because it is the highest datetime value seen) where Beam's
>>>> watermark is an approximation of the earliest (some call it a "low water
>>>> mark" because it is a guarantee that it will not dip lower). When I chatted
>>>> about this with Amit in the early days, it was necessary to implement a
>>>> Beam-style watermark using Spark state. I think that may still be the case,
>>>> for correct results.
>>>>
>>>>
>>> In the Spark implementation I saw that watermark holds weren't wired at
>>> all to control Sparks watermarks and this was causing triggers to fire too
>>> early.
>>>
>>>
>>>> Also, I started a doc[2] to produce an updated blog post since the
>>>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking 
>>>>> of
>>>>> making this a new blog post and having the old blog post point to it. We
>>>>> could also remove the old blog post and or update it. Any thoughts?
>>>>>
>>>>
>>>> New blog post w/ pointer from the old one.
>>>>
>>>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>>>>> expansion into each of the runners instead of having it within Read
>>>>> transform within beam-sdks-java-core.
>>>>>
>>>>
>>>> Approved! I did CC a bunch of runner authors already. I think the
>>>> important thing is if a default changes we should be sure everyone is OK
>>>> with the perf changes, and everyone is confident that no incorrect results
>>>> are produced. The abstractions between sdk-core, runners-core-*, and
>>>> individual runners is important to me:
>>>>
>>>>  - The SDK's job is to produce a portable, un-tweaked pipeline so
>>>> moving flags out of SDK core (and IOs) ASAP i

Re: Please add me to the mailing list

2020-10-19 Thread Luke Cwik
Send an e-mail to dev-subscr...@beam.apache.org to subscribe as per
https://beam.apache.org/community/contact-us/

On Mon, Oct 19, 2020 at 9:24 AM Mike Lo  wrote:

> Thanks!
>
> Best,
> Mike
>
> PhD, Bioengineering
> San Francisco Bay Area
> Mobile: 510-710-4906 <(510)%20710-4906>
> LinkedIn  | Website
> 
>


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-10-14 Thread Luke Cwik
Thanks Alexey, that is correct.

On Wed, Oct 14, 2020 at 10:33 AM Alexey Romanenko 
wrote:

> Thanks Luke, just I guess that the proper link should be this one:
>
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>
> On 13 Oct 2020, at 00:23, Luke Cwik  wrote:
>
> I have a draft[1] off the blog ready. Please take a look.
>
> 1:
> http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo
>
> On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik  wrote:
>
>>
>>
>> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik  wrote:
>>>
>>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>>>> use SDF powered Read transforms. Users can opt-out
>>>> with --experiments=use_deprecated_read.
>>>>
>>>
>>> Huzzah! In our release notes maybe be clear about the expectations for
>>> users:
>>>
>>> Done in https://github.com/apache/beam/pull/13015
>>
>>
>>>  - semantics are expected to be the same: file bugs for any change in
>>> results
>>>  - perf may vary: file bugs or write to user@
>>>
>>> I was unable to get Spark done for 2.25 as I found out that Spark
>>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>>> the watermark system in Spark I could use some guidance here as I believe I
>>>> have a version of unbounded SDF support written for Spark (I get all the
>>>> expected output from tests, just that watermarks aren't being held back so
>>>> PAssert fails).
>>>>
>>>
>>> Spark's watermarks are not comparable to Beam's. The rule as I
>>> understand it is that any data that is later than `max(seen timestamps) -
>>> allowedLateness` is dropped. One difference is that dropping is relative to
>>> the watermark instead of expiring windows, like early versions of Beam. The
>>> other difference is that it track the latest event (some call it a "high
>>> water mark" because it is the highest datetime value seen) where Beam's
>>> watermark is an approximation of the earliest (some call it a "low water
>>> mark" because it is a guarantee that it will not dip lower). When I chatted
>>> about this with Amit in the early days, it was necessary to implement a
>>> Beam-style watermark using Spark state. I think that may still be the case,
>>> for correct results.
>>>
>>>
>> In the Spark implementation I saw that watermark holds weren't wired at
>> all to control Sparks watermarks and this was causing triggers to fire too
>> early.
>>
>>
>>> Also, I started a doc[2] to produce an updated blog post since the
>>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>>>> making this a new blog post and having the old blog post point to it. We
>>>> could also remove the old blog post and or update it. Any thoughts?
>>>>
>>>
>>> New blog post w/ pointer from the old one.
>>>
>>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>>>> expansion into each of the runners instead of having it within Read
>>>> transform within beam-sdks-java-core.
>>>>
>>>
>>> Approved! I did CC a bunch of runner authors already. I think the
>>> important thing is if a default changes we should be sure everyone is OK
>>> with the perf changes, and everyone is confident that no incorrect results
>>> are produced. The abstractions between sdk-core, runners-core-*, and
>>> individual runners is important to me:
>>>
>>>  - The SDK's job is to produce a portable, un-tweaked pipeline so moving
>>> flags out of SDK core (and IOs) ASAP is super important.
>>>  - The runner's job is to execute that pipeline, if they can, however
>>> they want. If a runner wants to run Read transforms differently/directly
>>> that is fine. If a runner is incapable of supporting SDF, then Read is
>>> better than nothing. Etc.
>>>  - The runners-core-* job is to just be internal libraries for runner
>>> authors to share code, and should not make any decisions about the Beam
>>> model, etc.
>>>
>>> Kenn
>>>
>>> 1: https://github.com/apache/beam/pull/12603
>>>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>>> 3: https://beam.apache.org/blog/splittable-do-fn/
>

Re: Adding transactional writer to SpannerIO

2020-10-13 Thread Luke Cwik
+user  for feedback from users.

As long as users know that they must structure their transactions to be
repeatable and/or are ok with a transaction occurring multiple times then
that should be fine.

Has most of the focus been around a serializable function from customers or
would something using Spanner DML make more sense?

On Tue, Oct 13, 2020 at 2:37 AM Niel Markwick  wrote:

> Hey Beam-dev...
>
> I recently had an interaction with a customer that wanted to run a
> read-update-write transform on a Cloud Spanner DB inside a streaming Beam
> pipeline. I suggested writing their own DoFn, and pointed them at some of
> the various pitfalls they need to avoid - (those at least that have been
> found and fixed in the Beam SpannerIO.Write transform!)
>
> This is not the first time I have had this request, and I was thinking
> about how to introduce a generic transactional RW Spanner writer: The user
> would supply a serializable function that takes the input element and
> performs the read-update-write, while the transform wraps this function in
> the code required to handle the Spanner connection and transform,
> potentially adding batching -- running multiple transactions at once.
>
> Would this be something that the community could find useful? Should I
> productionize the PoC I have and submit a PR?
>
> In one sense it is against the 'repeatable
> '
> recommendation of a DoFn (for example, a transaction that increments a DB
> counter would not be idempotent), but in another sense, it makes certain
> actions more reliable (eg processing bank account transfers).
>
> All opinions welcome.
>
> --
> 
> * •  **Niel Markwick*
> * •  *Cloud Solutions Architect 
> * •  *Google Belgium
> * •  *ni...@google.com
> * •  *+32 2 894 6771
>
>
> Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie. RPR: 
> 0878.065.378
>
> If you have received this communication by mistake, please don't forward
> it to anyone else (it may contain confidential or privileged information),
> please erase all copies of it, including all attachments, and please let
> the sender know it went to the wrong person. Thanks
>


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-10-12 Thread Luke Cwik
I have a draft[1] off the blog ready. Please take a look.

1:
http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo

On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik  wrote:

>
>
> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik  wrote:
>>
>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>>> use SDF powered Read transforms. Users can opt-out
>>> with --experiments=use_deprecated_read.
>>>
>>
>> Huzzah! In our release notes maybe be clear about the expectations for
>> users:
>>
>> Done in https://github.com/apache/beam/pull/13015
>
>
>>  - semantics are expected to be the same: file bugs for any change in
>> results
>>  - perf may vary: file bugs or write to user@
>>
>> I was unable to get Spark done for 2.25 as I found out that Spark
>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>> the watermark system in Spark I could use some guidance here as I believe I
>>> have a version of unbounded SDF support written for Spark (I get all the
>>> expected output from tests, just that watermarks aren't being held back so
>>> PAssert fails).
>>>
>>
>> Spark's watermarks are not comparable to Beam's. The rule as I understand
>> it is that any data that is later than `max(seen timestamps) -
>> allowedLateness` is dropped. One difference is that dropping is relative to
>> the watermark instead of expiring windows, like early versions of Beam. The
>> other difference is that it track the latest event (some call it a "high
>> water mark" because it is the highest datetime value seen) where Beam's
>> watermark is an approximation of the earliest (some call it a "low water
>> mark" because it is a guarantee that it will not dip lower). When I chatted
>> about this with Amit in the early days, it was necessary to implement a
>> Beam-style watermark using Spark state. I think that may still be the case,
>> for correct results.
>>
>>
> In the Spark implementation I saw that watermark holds weren't wired at
> all to control Sparks watermarks and this was causing triggers to fire too
> early.
>
>
>> Also, I started a doc[2] to produce an updated blog post since the
>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>>> making this a new blog post and having the old blog post point to it. We
>>> could also remove the old blog post and or update it. Any thoughts?
>>>
>>
>> New blog post w/ pointer from the old one.
>>
>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>>> expansion into each of the runners instead of having it within Read
>>> transform within beam-sdks-java-core.
>>>
>>
>> Approved! I did CC a bunch of runner authors already. I think the
>> important thing is if a default changes we should be sure everyone is OK
>> with the perf changes, and everyone is confident that no incorrect results
>> are produced. The abstractions between sdk-core, runners-core-*, and
>> individual runners is important to me:
>>
>>  - The SDK's job is to produce a portable, un-tweaked pipeline so moving
>> flags out of SDK core (and IOs) ASAP is super important.
>>  - The runner's job is to execute that pipeline, if they can, however
>> they want. If a runner wants to run Read transforms differently/directly
>> that is fine. If a runner is incapable of supporting SDF, then Read is
>> better than nothing. Etc.
>>  - The runners-core-* job is to just be internal libraries for runner
>> authors to share code, and should not make any decisions about the Beam
>> model, etc.
>>
>> Kenn
>>
>> 1: https://github.com/apache/beam/pull/12603
>>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>> 3: https://beam.apache.org/blog/splittable-do-fn/
>>> 4: https://github.com/apache/beam/pull/13006
>>>
>>>
>>> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels 
>>> wrote:
>>>
>>>> Thanks Luke! I've had a pass.
>>>>
>>>> -Max
>>>>
>>>> On 28.08.20 01:22, Luke Cwik wrote:
>>>> > As an update.
>>>> >
>>>> > Direct and Twister2 are done.
>>>> > Samza: is ready for review[1].
>>>> > Flink: is almost ready for review. [2] lays all the groundwork for
>>>> the
>>>> > migration and [3] finishes the migration (there is a ti

Re: Throttling stream outputs per trigger?

2020-10-07 Thread Luke Cwik
SplittableDoFns apply to both batch and streaming pipelines. They are
allowed to produce an unbounded amount of data and can either self
checkpoint saying they want to resume later or the runner will ask them to
checkpoint via a split call.

There hasn't been anything concrete on backpressure, there has been work
done about exposing signals[1] related to IO that a runner can then use
intelligently but throttling isn't one of them yet.

1:
https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E

On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez 
wrote:

> Thanks for the response.  Is my understanding correct that SplittableDoFns
> are only applicable to Batch pipelines?  I'm wondering if there's any
> proposals to address backpressure needs?
> *~Vincent*
>
>
> On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik  wrote:
>
>> There is no general back pressure mechanism within Apache Beam (runners
>> should be intelligent about this but there is currently no way to say I'm
>> being throttled so runners don't know that throwing more CPUs at a problem
>> won't make it go faster). Y
>>
>> You can control how quickly you ingest data for runners that support
>> splittable DoFns with SDK initiated checkpoints with resume delays. A
>> splittable DoFn is able to return resume().withDelay(Duration.seconds(10))
>> from the @ProcessElement method. See Watch[1] for an example.
>>
>> The 2.25.0 release enables more splittable DoFn features on more runners.
>> I'm working on a blog (initial draft[2], still mostly empty) to update the
>> old blog from 2017.
>>
>> 1:
>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>> 2:
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>>
>>
>> On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez <
>> vincent.marq...@gmail.com> wrote:
>>
>>> Hmm, I'm not sure how that will help, I understand how to batch up the
>>> data, but it is the triggering part that I don't see how to do.  For
>>> example, in Spark Structured Streaming, you can set a time trigger which
>>> happens at a fixed interval all the way up to the source, so the source can
>>> throttle how much data to read even.
>>>
>>> Here is my use case more thoroughly explained:
>>>
>>> I have a Kafka topic (with multiple partitions) that I'm reading from,
>>> and I need to aggregate batches of up to 500 before sending a single batch
>>> off in an RPC call.  However, the vendor specified a rate limit, so if
>>> there are more than 500 unread messages in the topic, I must wait 1 second
>>> before issuing another RPC call. When searching on Stack Overflow I found
>>> this answer: https://stackoverflow.com/a/57275557/25658 that makes it
>>> seem challenging, but I wasn't sure if things had changed since then or you
>>> had better ideas.
>>>
>>> *~Vincent*
>>>
>>>
>>> On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik  wrote:
>>>
>>>> Look at the GroupIntoBatches[1] transform. It will buffer "batches" of
>>>> size X for you.
>>>>
>>>> 1:
>>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>>>
>>>> On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez <
>>>> vincent.marq...@gmail.com> wrote:
>>>>
>>>>> the downstream consumer has these requirements.
>>>>>
>>>>> *~Vincent*
>>>>>
>>>>>
>>>>> On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik  wrote:
>>>>>
>>>>>> Why do you want to only emit X? (e.g. running out of memory in the
>>>>>> runner)
>>>>>>
>>>>>> On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez <
>>>>>> vincent.marq...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello all.  If I want to 'throttle' the number of messages I pull
>>>>>>> off say, Kafka or some other queue, in order to make sure I only emit X
>>>>>>> amount per trigger, is there a way to do that and ensure that I get 'at
>>>>>>> least once' delivery guarantees?   If this isn't supported, would the
>>>>>>> better way be to pull the limited amount opposed to doing it on the 
>>>>>>> output
>>>>>>> side?
>>>>>>>
>>>>>>>
>>>>>>> *~Vincent*
>>>>>>>
>>>>>>


Re: Throttling stream outputs per trigger?

2020-10-06 Thread Luke Cwik
There is no general back pressure mechanism within Apache Beam (runners
should be intelligent about this but there is currently no way to say I'm
being throttled so runners don't know that throwing more CPUs at a problem
won't make it go faster). Y

You can control how quickly you ingest data for runners that support
splittable DoFns with SDK initiated checkpoints with resume delays. A
splittable DoFn is able to return resume().withDelay(Duration.seconds(10))
from the @ProcessElement method. See Watch[1] for an example.

The 2.25.0 release enables more splittable DoFn features on more runners.
I'm working on a blog (initial draft[2], still mostly empty) to update the
old blog from 2017.

1:
https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
2:
https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#


On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez 
wrote:

> Hmm, I'm not sure how that will help, I understand how to batch up the
> data, but it is the triggering part that I don't see how to do.  For
> example, in Spark Structured Streaming, you can set a time trigger which
> happens at a fixed interval all the way up to the source, so the source can
> throttle how much data to read even.
>
> Here is my use case more thoroughly explained:
>
> I have a Kafka topic (with multiple partitions) that I'm reading from, and
> I need to aggregate batches of up to 500 before sending a single batch off
> in an RPC call.  However, the vendor specified a rate limit, so if there
> are more than 500 unread messages in the topic, I must wait 1 second before
> issuing another RPC call. When searching on Stack Overflow I found this
> answer: https://stackoverflow.com/a/57275557/25658 that makes it seem
> challenging, but I wasn't sure if things had changed since then or you had
> better ideas.
>
> *~Vincent*
>
>
> On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik  wrote:
>
>> Look at the GroupIntoBatches[1] transform. It will buffer "batches" of
>> size X for you.
>>
>> 1:
>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>
>> On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez 
>> wrote:
>>
>>> the downstream consumer has these requirements.
>>>
>>> *~Vincent*
>>>
>>>
>>> On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik  wrote:
>>>
>>>> Why do you want to only emit X? (e.g. running out of memory in the
>>>> runner)
>>>>
>>>> On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez <
>>>> vincent.marq...@gmail.com> wrote:
>>>>
>>>>> Hello all.  If I want to 'throttle' the number of messages I pull off
>>>>> say, Kafka or some other queue, in order to make sure I only emit X amount
>>>>> per trigger, is there a way to do that and ensure that I get 'at least
>>>>> once' delivery guarantees?   If this isn't supported, would the better way
>>>>> be to pull the limited amount opposed to doing it on the output side?
>>>>>
>>>>>
>>>>> *~Vincent*
>>>>>
>>>>


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-10-05 Thread Luke Cwik
For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will use
SDF powered Read transforms. Users can opt-out
with --experiments=use_deprecated_read.

I was unable to get Spark done for 2.25 as I found out that Spark streaming
doesn't support watermark holds[1]. If someone knows more about the
watermark system in Spark I could use some guidance here as I believe I
have a version of unbounded SDF support written for Spark (I get all the
expected output from tests, just that watermarks aren't being held back so
PAssert fails).

Also, I started a doc[2] to produce an updated blog post since the original
SplittableDoFn blog from 2017 is out of date[3]. I was thinking of making
this a new blog post and having the old blog post point to it. We could
also remove the old blog post and or update it. Any thoughts?

Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
expansion into each of the runners instead of having it within Read
transform within beam-sdks-java-core.

1: https://github.com/apache/beam/pull/12603
2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
3: https://beam.apache.org/blog/splittable-do-fn/
4: https://github.com/apache/beam/pull/13006


On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels  wrote:

> Thanks Luke! I've had a pass.
>
> -Max
>
> On 28.08.20 01:22, Luke Cwik wrote:
> > As an update.
> >
> > Direct and Twister2 are done.
> > Samza: is ready for review[1].
> > Flink: is almost ready for review. [2] lays all the groundwork for the
> > migration and [3] finishes the migration (there is a timeout happening
> > in FlinkSubmissionTest that I'm trying to figure out).
> > No further updates on Spark[4] or Jet[5].
> >
> > @Maximilian Michels <mailto:m...@apache.org> or @t...@apache.org
> > <mailto:thomas.we...@gmail.com>, can either of you take a look at the
> > Flink PRs?
> > @ke.wu...@icloud.com <mailto:ke.wu...@icloud.com>, Since Xinyu
> delegated
> > to you, can you take another look at the Samza PR?
> >
> > 1: https://github.com/apache/beam/pull/12617
> > 2: https://github.com/apache/beam/pull/12706
> > 3: https://github.com/apache/beam/pull/12708
> > 4: https://github.com/apache/beam/pull/12603
> > 5: https://github.com/apache/beam/pull/12616
> >
> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
> > mailto:pulasthi...@gmail.com>> wrote:
> >
> > Hi Luke
> >
> > Will take a look at this as soon as possible and get back to you.
> >
> > Best Regards,
> > Pulasthi
> >
> > On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik  > <mailto:lc...@google.com>> wrote:
> >
> > I have made some good progress here and have gotten to the
> > following state for non-portable runners:
> >
> > DirectRunner[1]: Merged. Supports Read.Bounded and
> Read.Unbounded.
> > Twister2[2]: Ready for review. Supports Read.Bounded, the
> > current runner doesn't support unbounded pipelines.
> > Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not
> > certain about level of unbounded pipeline support coverage since
> > Spark uses its own tiny suite of tests to get unbounded pipeline
> > coverage instead of the validates runner set.
> > Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
> > needs additional work.
> > Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
> > unbounded pipeline support coverage since Spark uses its own
> > tiny suite of tests to get unbounded pipeline coverage instead
> > of the validates runner set.
> > Flink: Unstarted.
> >
> > @Pulasthi Supun Wickramasinghe <mailto:pulasthi...@gmail.com> ,
> > can you help me with the Twister2 PR[2]?
> > @Ismaël Mejía <mailto:ieme...@gmail.com>, is PR[3] the expected
> > level of support for unbounded pipelines and hence ready for
> review?
> > @Jozsef Bartok <mailto:jo...@hazelcast.com>, can you help me out
> > to get support for unbounded splittable DoFn's into Jet[4]?
> > @Xinyu Liu <mailto:xinyuliu...@gmail.com>, is PR[5] the expected
> > level of support for unbounded pipelines and hence ready for
> review?
> >
> > 1: https://github.com/apache/beam/pull/12519
> > 2: https://github.com/apache/beam/pull/12594
> > 3: https://github.com/apache/beam/pull/12603
> > 4: https://github.com/apache/beam/pull/12616
> > 5: https://github.com/apache/beam/

Re: Self-checkpoint Support on Portable Flink

2020-10-05 Thread Luke Cwik
Thanks Boyuan, I left a few comments.

On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang  wrote:

> Hi team,
>
> I'm looking at adding self-checkpoint support to portable Flink runner(
> BEAM-10940 ) for both
> batch and streaming. I summarized the problem that we want to solve and
> proposed 2 potential approaches in this doc
> 
> .
> I want to collect feedback on which approach is preferred and anything
> that I have not taken into consideration yet but I should.
> Many thanks to all your help!
>
> Boyuan
>
>


Re: Support streaming side-inputs in the Spark runner

2020-10-02 Thread Luke Cwik
Support for watermark holds is missing for both Spark streaming
implementations (DStream and structured streaming) so watermark based
triggers don't produce the correct output.

Excluding the direct runner, Flink is the OSS runner with the most people
working on it adding features and fixing bugs in it.
Spark batch is in a good state but streaming development is still ongoing
and also has a small group of folks.


On Fri, Oct 2, 2020 at 10:16 AM  wrote:

> For clarification, is it just streaming side inputs that present an issue
> for SparkRunner or are there other areas that need work?  We've started
> work on a Beam-based project that includes both streaming and batch
> oriented work and a Spark cluster was our choice due to the perception that
> it could handle both types of applications.
>
> However, that would have to be reevaluated if SparkRunner isn't up for
> streaming deployments.  And it seems that SparkStructuredStreamingRunner
> still needs some time before it's a fully-featured solution.  I guess I'm
> trying to get a sense of whether these runners are still being actively
> developed or were they donated by a third-party and are now suffering from
> bit-rot.
>
> Oct 1, 2020, 10:54 by lc...@google.com:
>
> I would suggest trying FlinkRunner as it is a much more complete streaming
> implementation.
> SparkRunner has several key things that are missing that won't allow your
> pipeline to function correctly.
> If you're really invested in getting SparkRunner working though feel free
> to contribute the necessary implementations for watermark holds and
> broadcast state necessary for side inputs.
>
> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan 
> wrote:
>
> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.   **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.   **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.   **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·Input Pcollection goes through FixedWindow transform.
>
> ·Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>- Noticed that SparkRunner doesn’t support the streaming side-inputs
>   in the Spark runner
>   -
>  
> https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>  (View.CreatePCollectionView.class not added to EVALUATORS Map)
>  - https://issues.apache.org/jira/browse/BEAM-2112
>  - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>
>
>


Re: Throttling stream outputs per trigger?

2020-10-01 Thread Luke Cwik
Look at the GroupIntoBatches[1] transform. It will buffer "batches" of size
X for you.

1:
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/

On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez 
wrote:

> the downstream consumer has these requirements.
>
> *~Vincent*
>
>
> On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik  wrote:
>
>> Why do you want to only emit X? (e.g. running out of memory in the runner)
>>
>> On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez 
>> wrote:
>>
>>> Hello all.  If I want to 'throttle' the number of messages I pull off
>>> say, Kafka or some other queue, in order to make sure I only emit X amount
>>> per trigger, is there a way to do that and ensure that I get 'at least
>>> once' delivery guarantees?   If this isn't supported, would the better way
>>> be to pull the limited amount opposed to doing it on the output side?
>>>
>>>
>>> *~Vincent*
>>>
>>


Re: Throttling stream outputs per trigger?

2020-10-01 Thread Luke Cwik
Why do you want to only emit X? (e.g. running out of memory in the runner)

On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez 
wrote:

> Hello all.  If I want to 'throttle' the number of messages I pull off say,
> Kafka or some other queue, in order to make sure I only emit X amount per
> trigger, is there a way to do that and ensure that I get 'at least once'
> delivery guarantees?   If this isn't supported, would the better way be to
> pull the limited amount opposed to doing it on the output side?
>
>
> *~Vincent*
>


Re: Support streaming side-inputs in the Spark runner

2020-10-01 Thread Luke Cwik
I would suggest trying FlinkRunner as it is a much more complete streaming
implementation.
SparkRunner has several key things that are missing that won't allow your
pipeline to function correctly.
If you're really invested in getting SparkRunner working though feel free
to contribute the necessary implementations for watermark holds and
broadcast state necessary for side inputs.

On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan 
wrote:

> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.   **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.   **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.   **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·Input Pcollection goes through FixedWindow transform.
>
> ·Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>- Noticed that SparkRunner doesn’t support the streaming side-inputs
>   in the Spark runner
>  -
>  
> https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>  (View.CreatePCollectionView.class not added to EVALUATORS Map)
>  - https://issues.apache.org/jira/browse/BEAM-2112
>  - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>


Re: [ANNOUNCE] Beam Java 8 image rename starting from 2.26.0 (to apache/beam_java8_sdk)

2020-10-01 Thread Luke Cwik
Can we copy the beam_java_sdk image to beam_java8_sdk for a few prior
releases so people who are on an older release can migrate now and not have
to remember to do it with 2.26?

On Tue, Sep 29, 2020 at 5:37 PM Emily Ye  wrote:

> Starting with the release of 2.26.0, the Java 8 SDK container image
>  (currently
> apache/beam_java_sdk )
> will be released under a new name, *apache/beam_java8_sdk.* This is in
> anticipation of future plans to release a Java 11 image[1]
>
> This will apply to:
>
>- officially released image starting with v2.26.0
>- *immediately for development images* built from master (post PR
>#12505 ) - i.e. if you are
>using a script/command with an argument that includes beam_java_sdk, use
>beam_java8_sdk.
>
>
> [1] Related JIRA: https://issues.apache.org/jira/browse/BEAM-8106
>
>


Re: Contributor permission for Beam Jira tickets

2020-10-01 Thread Luke Cwik
Welcome to the community. I have added you as a contributor.

Please take a look at the contribute guide[1].

1: https://beam.apache.org/contribute/

On Thu, Oct 1, 2020 at 9:49 AM George Pearman  wrote:

> Hi
>
> My name is George.  I have been developing Beam applications at LinkedIn
> for the past 1.5 years and I would like to start contributing back to Beam
> open source.
>
> Can someone please add me as a contributor for Beam's Jira issue tracker?
> I would like to create/assign tickets for my work.
>
> My username is gpearman.
>
> George


Re: I/O connectors for streaming GRPC source

2020-09-30 Thread Luke Cwik
There is no generic gRPC connector and it is unlikely that there ever will
be one.

A lot of the time integration with external systems is for ingesting large
amounts of data which works best with certain features which gRPC doesn't
natively support but an application protocol built on top of gRPC usually
does. Things like checkpointing and resuming from a position in the stream,
being able to split streams, acking messages so they aren't published in
the stream,  To learn more, you should take a look at this splittable
DoFn blog[1].

There are a few sources that have been written that use gRPC but Apache
Beam integrates using a higher level application specific protocol. Take a
look at SpannerIO[2] and PubsubLite[3] since they wrap gRPC with their
client libraries.

You can always start by writing a normal DoFn that connects to this service
and eventually migrating to a splittable DoFn once you have scaling
concerns.

1: https://beam.apache.org/blog/splittable-do-fn/
2:
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
3:
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java

On Wed, Sep 30, 2020 at 11:24 AM Maksim Pilipeyko <
maksim.pilipe...@colada.biz> wrote:

> Hi,
>
>
>
> What connector can I use if I should read data from streaming grpc api?
>
>
>
> Best regards,
> Maksim
>


Re: How to write a Python wrapper for MQTT io

2020-09-28 Thread Luke Cwik
It is also very important to document the URN and describe its
configuration payload so that a new Beam SDK who wants to use the XLang
transform knows what the spec is and that if the XLang implementation were
to change it can still honor the original spec.

+Chamikara Jayalath , is there a good place for this
yet?
I tried finding where this is for Kafka (beam:external:java:kafka:read:v1)
and was unable to. Should we have a proto similar to our well known
transforms dedicated to XLang transforms and their payloads?

On Mon, Sep 28, 2020 at 10:12 AM Brian Hulette  wrote:

> Hi Carolyn, welcome to the dev list :)
>
> I'm assuming you're interested in making Java's MqttIO [1] available in
> Python as a cross-language transform? Unfortunately I don't think there's a
> concise guide for this yet. It definitely makes sense to follow KafkaIO as
> an example, but I know there's a lot of code to dig through.. I can give
> you a few pointers to the relevant parts.
>
> On the Java side, you'll need to provide implementations of
> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can
> create an MqttIO Read and/or Write transforms. The Registrar is what
> determines the URN, while the Builder determines the configuration
> parameters via it's Configuration class [4] (we inspect the getters/setters
> on the class for this).
> You'll also want to make sure that there's a gradle target for building an
> expansion service jar that includes your new MqttIO
> ExternalTransformRegistrar. The easiest way to do this would be to add it
> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency
> there (the same thing we do for Kafka [5]). This is fine to do for testing
> of course, but ultimately we should probably have a separate expansion
> service jar for it, like the one for KinesisIO [6].
>
> On the Python side, you'll need to provide a stub that extends
> ExternalTransform. The critical pieces are that you've referenced the
> correct expansion service jar [7], the same URN as in Java [8], and use a
> compatible configuration object [9].
>
> I hope this helps! Please let us know if you need any more pointers
> Brian
>
> [1]
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html
> [2]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616
> [3]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512
> [4]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629
> [5]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35
> [6]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle
> [7]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107
> [8]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123
> [9]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93
>
> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen 
> wrote:
>
>> Greetings,
>>
>> I need Python MQTT IO support for a project I'm working on. I've looked
>> at the Kafka IO module, and planned to follow it, but I'm having trouble
>> finding the URN to use, and how to configure the input parameters. Is there
>> any documentation about this? If not, I'd appreciate an explanation of the
>> steps I need to take to implement this.
>>
>> Best regards,
>> Carolyn
>>
>> PS- apologies if there are duplicate posts. I haven't posted to this user
>> list before and wasn't sure if my message wasn't appearing on the list
>> because I wasn't subscribed yet.
>>
>


Re: Kafka Streams Runner [BEAM-2466]

2020-09-25 Thread Luke Cwik
That's exciting.

I would suggest that you take a look at implementing a portable runner so
that you get cross language pipelines and the ability to execute Python and
Go pipelines. Looking at https://s.apache.org/beam-fn-api and the Flink or
Samza implementations would be good starting points.



On Fri, Sep 25, 2020 at 11:35 AM Pablo Estrada  wrote:

> Hi Kyle,
> this is very cool. For others, here's the link to compare Kyle's branch to
> Beam's master[1].
>
> I don't have a lot of experience writing runners, so someone else should
> look as well, but I'll try to take a look at your branch in the next few
> days, because I think it's pretty cool : )
>
>
> [1]
> https://github.com/apache/beam/compare/master...kyle-winkelman:kafka-streams
>
> On Thu, Sep 24, 2020 at 9:53 AM Kyle Winkelman 
> wrote:
>
>> Hello everyone,
>>
>>
>>
>> Jira:
>>
>> https://issues.apache.org/jira/browse/BEAM-2466
>>
>>
>>
>> My Branch:
>>
>> https://github.com/kyle-winkelman/beam/tree/kafka-streams
>>
>>
>>
>> I have taken an initial pass at creating a Kafka Streams Runner. It
>> passes nearly all of the @ValidatesRunner tests. I am hoping to find
>> someone that has experience writing a Runner to take a look at it and give
>> me some feedback before I open a PR.
>>
>>
>>
>> I am using the Kafka Streams DSL
>> ,
>> so a PCollection is equivalent to a KStream
>> 
>> .
>>
>>
>>
>> ParDo:
>>
>>   - implements Transformer
>> 
>>  to KStream#transform
>> 
>>
>>   - outputs KStream, WindowedValue> then uses a
>> KStream#branch
>> 
>>  to
>> handle Pardo.MultiOutput
>>
>>   - schedules a Punctuator
>> 
>>  to
>> periodically start/finish bundles
>>
>>
>>
>> GroupByKey:
>>
>>   - KStream#repartition
>> 
>>  the
>> data to groupOnlyByKey
>>
>>   - groupAlsoByWindow similarly to ParDo, runs ReduceFn instead of DoFn
>>
>>
>>
>> Flatten:
>>
>>   - KStream#merge
>> 
>>
>>
>>
>> Combine:
>>
>>   - not as composite, but as primitive GroupByKey/ParDo
>>
>>
>>
>> Composite Transforms:
>>
>>   - inlining (I believe)
>>
>>
>>
>> Side Inputs:
>>
>>   - write data to a topic with the key being the
>> StateNamespace.stringKey()
>>
>>   - read topic into a GlobalKTable
>> 
>>  and
>> materialize it into a KeyValueStore
>> 
>>
>>   - in ParDo, access the KeyValueStore, via the ProcessorContext
>> ,
>> and use the WindowFn to get the Side Input Window and its
>> StateNamespace.stringKey() to look up the value
>>
>>
>>
>> Source API:
>>
>>   - overridden by SplittableDoFn, via
>> Read.SPLITTABLE_DOFN_PREFERRED_RUNNERS
>>
>>
>>
>> Impulse:
>>
>>   - if not exists, create a topic in Kafka and use a KafkaProducer
>> 
>>  to
>> generate the initial record
>>
>>   - generate a KStream from the above topic
>>
>>
>>
>> SplittableDoFn:
>>
>>   - overriden by SplittableParDo/SplittableParDoViaKeyedWorkItems
>>
>>   - extends Stateful Processing, runs ProcessFn instead of DoFn and
>> Punctuator fires timers as expected by the ProcessFn
>>
>>   - write watermarks to a separate topic to be aggregated and read into a
>> GlobalKTable and materialize it into a KeyValueStore
>>
>>
>>
>> Stateful Processing:
>>
>>   - StateInternals and TimerInternals are materialized in KeyValueStores
>>
>>   - extends ParDo, with statefulDoFnRunner and Punctuator advances
>> TimerInternals (via watermarks KeyValueStore) and fire timers
>>
>>
>> Thanks,
>>
>> Kyle Winkelman
>>
>


Re: Intro and ticket BEAM-10938

2020-09-22 Thread Luke Cwik
Thanks for reaching out.

"Triage needed" is the default state when a bug is opened and does not mean
that it is yet to be decided. Typically if there is something of note,
either the contributor asks on the dev@ mailing list about it and works
with the community or opens a PR and a reviewer will bring the PR to
discussion within the community. The former happens for "larger" things and
the "latter" is typically for smaller things when some kind of ambiguity in
a decision.

I have added you as a contributor to the Beam JIRA project and assigned
BEAM-10938 to you. Feel free to open the PR and find a committer to review
it (ask the issue creator or a committer who is familiar with the code in
the location (via git blame) for a review and tag them with R: @nickname).


On Tue, Sep 22, 2020 at 9:37 AM Milan Cermak  wrote:

> Hi,
>
> I'm Milan, I'm a software engineer and I'm new to Beam.
>
> I've noticed this new issue in JIRA (
> https://issues.apache.org/jira/browse/BEAM-10938). Out of curiosity, I
> decided to dig deeper and see if I can add the functionality. I think I got
> it (you can see the diff here
> https://github.com/apache/beam/compare/master...milancermak:BEAM-10938),
> but then I've also noticed the status is "Triage needed" which, I assume,
> means it's not yet decided by the community if this should be worked on at
> all. Is that right? Should I send a PR or wait until the status clears? Any
> guidance is appreciated.
>
> FWIW, my JIRA username is milancermak
>
> Cheers,
>
> Milan
>


Re: Output from Window not getting materialized

2020-09-21 Thread Luke Cwik
On Mon, Sep 21, 2020 at 5:02 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke, Thanks for the detailed explanation. This gives more insight to
> new people like me trying to grok the whole concept.
>
> *1) What timestamp your going to output your records at*
>
> ** use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default*
> ** use data from within the record being output or external system state
> via an API call: use a watermark estimator*
>
> In the above section, I do not think our source has a watermark concept
> built-in to derive and use it in SDF so we will have to go with the second
> option. If suppose we could extract a timestamp from the source message
> then do we have to setWatermark with that extracted timestamp before each
> output in @ProcessElement? And can we use the Manual Watermark Estimator
> itself for this approach?
>

You want to use context.outputWithTimestamp when parsing your own
timestamps and emitting records.

Using the manual one works but also take a look at timestamp observing
works since it will be told the timestamp of each element being produced.
Using the timestamp observing ones (monotonically increasing or your own)
allows you to decouple the watermark estimator logic from the SDF
implementation.


>
>
> *2) How you want to compute the watermark estimate (if at all)*
> ** the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted,
> ...?*
>
> Our elements are "almost sorted order" because of which we want to hold
> off processing message_01 with timestamp 11:00:10 AM until we process
> message-02 with timestamp 11:00:08 AM. How do we enable this ordering while
> processing the messages?
>
> Based on your suggestion, I tried WallTime Estimator and it worked for one
> of our many scenarios. I am planning to test it with a bunch of other
> window types and use that till we get a solid hold on doing it in the above
> mentioned way that can handle the unsorted messages.
>

If you're extracting the timestamps out of your data, it would likely be
best to use the monotonically increasing timestamp estimator or write one
that computes one using some statistical method appropriate to your source.
If you think you have written one that is generally useful, feel free to
contribute it to Beam.

You'll want to look into @RequiresTimeSortedInput[1]. This allows you to
produce the messages in any order and requires the runner to make sure they
are sorted before passing to a downstream stateful DoFn.

1:
https://lists.apache.org/thread.html/9cdac2a363e18be58fa1f14c838c61e8406ae3407e4e2d05e423234c%40%3Cdev.beam.apache.org%3E


>
> Regards,
> Praveen
>
> On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik  wrote:
>
>> To answer your specific question, you should create and return the
>> WallTime estimator. You shouldn't need to interact with it from within
>> your @ProcessElement call since your elements are using the current time
>> for their timestamp.
>>
>> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik  wrote:
>>
>>> Kafka is a complex example because it is adapting code from before there
>>> was an SDF implementation (namely the TimestampPolicy and the
>>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>>>
>>> There are three types of watermark estimators that are in the Beam Java
>>> SDK today:
>>> Manual: Can be invoked from within your @ProcessElement method within
>>> your SDF allowing you precise control over what the watermark is.
>>> WallTime: Doesn't need to be interacted with, will report the current
>>> time as the watermark time. Once it is instantiated and returned via the
>>> @NewWatermarkEstimator method you don't need to do anything with it. This
>>> is functionally equivalent to calling setWatermark(Instant.now()) right
>>> before returning from the @ProcessElement method in the SplittableDoFn on a
>>> Manual watermark.
>>> TimestampObserving: Is invoked using the output timestamp for every
>>> element that is output. This is functionally equivalent to calling
>>> setWatermark after each output within your @ProcessElement method in the
>>> SplittableDoFn. The MonotonicallyIncreasing implementation for
>>> the TimestampObserving estimator ensures that the largest timestamp seen so
>>> far will be reported for the watermark.
>>>
>>> The default is to not set any watermark estimate.
>>>
>>> For all watermark estimators you're allowed to set the watermark
>>> estimate to anything as the runner w

Re: What is the process to remove a Jenkins job?

2020-09-21 Thread Luke Cwik
When the seed job runs next time, any job that isn't explicitly part of the
seed job is disabled.

The existing job history will stick around and eventually someone should
delete them manually from Jenkins.

On Mon, Sep 21, 2020 at 10:46 AM Valentyn Tymofieiev 
wrote:

> We are removing several jobs associated with Py2 and Py35. Is removing a
> groovy file sufficient or Jenkins will still remember the job from the
> earlier 'Seed' invocation and continue running it until manually disabled?
> If so, what's the process for manually disabling the job?
>
> Looked at Jenkins tips on the dev wiki[1] but didn't see these
> instructions.
>
> Thanks!
>
> [1] https://cwiki.apache.org/confluence/display/BEAM/Jenkins+Tips
>


Re: How to gracefully stop a beam application

2020-09-21 Thread Luke Cwik
+user 

On Mon, Sep 21, 2020 at 9:16 AM Luke Cwik  wrote:

> You need the "sources" to stop and advance the watermark to infinity and
> have that propagate through the entire pipeline. There are propoosals for
> pipeline drain[1] and also for snapshot and update[2] for Apache Beam. We
> would love contributions in this space.
>
> Max shared some more details about how Flink users typically do this[3],
> does that apply to Spark?
>
> 1:
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2:
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
> 3:
> https://lists.apache.org/thread.html/864eb7b4e7192706074059eef1e116146382552fa885dd6054ef4988%40%3Cuser.beam.apache.org%3E
>
> On Mon, Sep 21, 2020 at 7:43 AM Sunny, Mani Kolbe  wrote:
>
>> Forgot to mention, we are using spark runner.
>>
>>
>>
>> *From:* Sunny, Mani Kolbe 
>> *Sent:* Monday, September 21, 2020 12:33 PM
>> *To:* dev@beam.apache.org
>> *Subject:* How to gracefully stop a beam application
>>
>>
>>
>> *CAUTION:* This email originated from outside of D Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> Hello Beam community,
>>
>>
>>
>> When you are running a Beam application in full stream mode, it is
>> continuously running. What is the recommended way to stop it gracefully for
>> say maintenance/upgrades etc? When I say gracefully, I mean (1) without
>> data loss and (2) application existing with exit 0 code.
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>


[DISCUSS] Clearing timers (https://github.com/apache/beam/pull/12836)

2020-09-18 Thread Luke Cwik
PR 12836[1] is adding support for clearing timers and there is a discussion
about what the semantics for a cleared timer should be.

So far we have:
1) Clearing an unset timer is a no-op
2) If the last action on the timer was to clear it, then a future bundle
should not see it fire

Ambiguity occurs if the last action on a timer was to clear it within the
same bundle then should the current bundle not see it fire if it has yet to
become visible to the user? Since element processing and timer firings are
"unordered", this can happen.

Having the clear prevent the timer from firing within the same bundle if it
has yet to fire could make sense and simplifies clearing timer loops. For
example:

@ProcessElement
process(ProcessContext c) {
  if (initialCondition) {
setTimer();
  } else {
clearTimer();
  }
}

@OnTimer
onTimer(...) {
  do some side effect
  set timer to fire again in the future
}

would require logic within the onTimer() method to check to see if we
should stop instead of relying on the fact that the clear will prevent the
timer loop.

On the other hand, we currently don't prevent timers from firing that are
eligible within the same bundle if their firing time is changed within the
bundle to some future time. Clearing timers could be treated conceptually
like setting them to "infinity" and hence the current set logic would
suggest that we shouldn't prevent timer firings that are part of the same
bundle.

Are there additional use cases that we should consider that suggest one
approach over the other?
What do people think?

1: https://github.com/apache/beam/pull/12836


Re: Output from Window not getting materialized

2020-09-18 Thread Luke Cwik
Kafka is a complex example because it is adapting code from before there
was an SDF implementation (namely the TimestampPolicy and the
TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).

There are three types of watermark estimators that are in the Beam Java SDK
today:
Manual: Can be invoked from within your @ProcessElement method within your
SDF allowing you precise control over what the watermark is.
WallTime: Doesn't need to be interacted with, will report the current time
as the watermark time. Once it is instantiated and returned via the
@NewWatermarkEstimator method you don't need to do anything with it. This
is functionally equivalent to calling setWatermark(Instant.now()) right
before returning from the @ProcessElement method in the SplittableDoFn on a
Manual watermark.
TimestampObserving: Is invoked using the output timestamp for every element
that is output. This is functionally equivalent to calling setWatermark
after each output within your @ProcessElement method in the SplittableDoFn.
The MonotonicallyIncreasing implementation for the TimestampObserving
estimator ensures that the largest timestamp seen so far will be reported
for the watermark.

The default is to not set any watermark estimate.

For all watermark estimators you're allowed to set the watermark estimate
to anything as the runner will recompute the output watermark as:
new output watermark = max(previous output watermark, min(upstream
watermark, watermark estimates))
This effectively means that the watermark will never go backwards from the
runners point of view but that does mean that setting the watermark
estimate below the previous output watermark (which isn't observable) will
not do anything beyond holding the watermark at the previous output
watermark.

Depending on the windowing strategy and allowed lateness, any records that
are output with a timestamp that is too early can be considered droppably
late, otherwise they will be late/ontime/early.

So as an author for an SDF transform, you need to figure out:
1) What timestamp your going to output your records at
* use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default
* use data from within the record being output or external system state via
an API call: use a watermark estimator
2) How you want to compute the watermark estimate (if at all)
* the choice here depends on how the elements timestamps progress, are they
in exactly sorted order, almost sorted order, completely unsorted, ...?

For both of these it is upto you to choose how much flexibility in these
decisions you want to give to your users and that should guide what you
expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
many other sources don't expose anything.


On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke,
>
> I am also looking at the `WatermarkEstimators.manual` option, in parallel.
> Now we are getting data past our Fixed Window but the aggregation is not as
> expected.  The doc says setWatermark will "set timestamp before or at the
> timestamps of all future elements produced by the associated DoFn". If I
> output with a timestamp as below then could you please clarify on how we
> should set the watermark for this manual watermark estimator?
>
> receiver.outputWithTimestamp(ossRecord, Instant.now());
>
> Thanks,
> Praveen
>
> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:
>
>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>> close allowing for the Count transform to produce output?
>>
>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
>> wrote:
>>
>>> Hi everyone!
>>>
>>> We are developing a new IO connector using the SDF API, and testing it
>>> with the following simple counting pipeline:
>>>
>>>
>>>
>>> p.apply(MyIO.read()
>>>
>>> .withStream(inputStream)
>>>
>>> .withStreamPartitions(Arrays.asList(0))
>>>
>>> .withConsumerConfig(config)
>>>
>>> ) // gets a PCollection>
>>>
>>>
>>>
>>>
>>>
>>> .apply(Values.*create*()) // PCollection
>>>
>>>
>>>
>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))
>>>
>>> .withAllowedLateness(Duration.standardDays(1))
>>>
>>> .accumulatingFiredPanes())
>>>
>>>
>>>
>>> .apply(Count.perElement())
>>>
>>>
>>>
>>>
>>>
>>> // write PCollection> to stream
>>>
>>> .apply(MyIO.write()
>>>
>>> .withStream(outputStream)
>>>
>>> .withConsumerConfig(config));
>>>
>>>
>>>
>>>
>>>
>>> Without the window transform, we can read from the stream and write to
>>> it, however, I don’t see output after the Window transform. Could you
>>> please help pin down the issue?
>>>
>>> Thank you,
>>>
>>> Gaurav
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Check for enough access to classes/methods of public API

2020-09-18 Thread Luke Cwik
I only know of tooling that can be used to make sure that the existing API
doesn't change.

Also there isn't an explicit "friend" concept in Java but people do
sometimes orchestrate public classes with methods that restrict who can use
them[1] but I don't think this is a case of that.

1:
https://stackoverflow.com/questions/14226228/implementation-of-friend-concept-in-java

On Fri, Sep 18, 2020 at 8:21 AM Alexey Romanenko 
wrote:

> Hello,
>
> In Java SDK, from time to time we face with an issue with an access to
> classes or methods, that are explicitly supposed to be as a part of public
> user’s API, but, by mistake, are made private or package-private.
>
> For example this issue [1], where KinesisClientThrottledException is
> package-private but it can be an argument in a public method of
> RateLimitPolicy interface if user wishes to implement its own.
>
> // public interface
> public interface RateLimitPolicy {
>   default void onThrottle(KinesisClientThrottledException e)
> }
>
> // package-private interface in the same package
> class KinesisClientThrottledException {
>   …
> }
>
> So user won’t be able to implement it’s own RateLimitPolicy with
> overridden onThrottle() method since KinesisClientThrottledException is
> package-private
>
> Does anyone know a tool or something like compilation option or Spotless
> check, that can detect a broken API at compilation time in case if, for
> example, arguments of public method don’t have enough access privileges for
> that? It would be very helpful to prevent such errors.
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-10816


Re: I would like to assign a BEAM ticke to myself

2020-09-14 Thread Luke Cwik
Welcome to the community. I have added you as a contributor and assigned
BEAM-10875 to you.

On Mon, Sep 14, 2020 at 4:16 PM terry xian  wrote:

> Hi,
>
> I have created a jira ticket: [BEAM-10875] Support NUMERIC type in
> spanner schema parser - ASF JIRA
> , and would like to
> assign it to myself (already had a PR)
>
> [BEAM-10875] Support NUMERIC type in spanner schema parser - ASF JIRA
>
> 
>
> The jira ticket was in "Tirage Needed" for several days. What should I do?
> Should I get a contributor role to assign the ticket to myself?
>
> Thanks!
>
>
>


Re: Jira contributor permissions

2020-09-11 Thread Luke Cwik
Welcome Kiley, I have done as you have requested.

On Fri, Sep 11, 2020 at 1:18 PM Kiley Sok  wrote:

> Hello,
>
> I'm Kiley, a SWE at Google working on Beam. Can I be added as a
> contributor to Jira? My username is kileys.
>
> Thanks,
> Kiley
>


Re: Clear Timer in Java SDK

2020-09-03 Thread Luke Cwik
Java SDK hasn't exposed the ability to remove timers.

On Wed, Sep 2, 2020 at 11:00 AM Boyuan Zhang  wrote:

> Hi team,
>
> I'm looking for something similar to timer.clear() from Python SDK[1] in
> Java SDK but it seems like we haven't exposed clearing timer API from Java
> Timer. Does Java SDK have another way to clear a timer or we just haven't
> worked on this API?
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L660-L671
>


Re: Contributor permission for Beam Jira tickets

2020-08-28 Thread Luke Cwik
Welcome to the community.

It looks like someone has already added you.

Check out the contribution guide[1].

1: https://beam.apache.org/contribute/

On Fri, Aug 28, 2020 at 12:03 PM Omkar Deshpande 
wrote:

> Hi, my name is Omkar Deshpande. I am interested in contributing java kafka
> io module in the Apache Beam SDK. I'd like to be added as a Jira
> contributor so that I can assign issues to myself. My ASF Jira Username is
> omkardeshpande8.
>
> I don't see the email I sent earlier from work email in the archives.
> Sorry in advance, if this is a duplicate email.
>
> Omkar
>


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-08-27 Thread Luke Cwik
As an update.

Direct and Twister2 are done.
Samza: is ready for review[1].
Flink: is almost ready for review. [2] lays all the groundwork for the
migration and [3] finishes the migration (there is a timeout happening in
FlinkSubmissionTest that I'm trying to figure out).
No further updates on Spark[4] or Jet[5].

@Maximilian Michels  or @t...@apache.org
, can either of you take a look at the Flink PRs?
@ke.wu...@icloud.com , Since Xinyu delegated to you,
can you take another look at the Samza PR?

1: https://github.com/apache/beam/pull/12617
2: https://github.com/apache/beam/pull/12706
3: https://github.com/apache/beam/pull/12708
4: https://github.com/apache/beam/pull/12603
5: https://github.com/apache/beam/pull/12616

On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi Luke
>
> Will take a look at this as soon as possible and get back to you.
>
> Best Regards,
> Pulasthi
>
> On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik  wrote:
>
>> I have made some good progress here and have gotten to the following
>> state for non-portable runners:
>>
>> DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
>> Twister2[2]: Ready for review. Supports Read.Bounded, the current runner
>> doesn't support unbounded pipelines.
>> Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not certain
>> about level of unbounded pipeline support coverage since Spark uses its own
>> tiny suite of tests to get unbounded pipeline coverage instead of the
>> validates runner set.
>> Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely needs
>> additional work.
>> Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
>> unbounded pipeline support coverage since Spark uses its own tiny suite of
>> tests to get unbounded pipeline coverage instead of the validates runner
>> set.
>> Flink: Unstarted.
>>
>> @Pulasthi Supun Wickramasinghe  , can you help me
>> with the Twister2 PR[2]?
>> @Ismaël Mejía , is PR[3] the expected level of
>> support for unbounded pipelines and hence ready for review?
>> @Jozsef Bartok , can you help me out to get support
>> for unbounded splittable DoFn's into Jet[4]?
>> @Xinyu Liu , is PR[5] the expected level of
>> support for unbounded pipelines and hence ready for review?
>>
>> 1: https://github.com/apache/beam/pull/12519
>> 2: https://github.com/apache/beam/pull/12594
>> 3: https://github.com/apache/beam/pull/12603
>> 4: https://github.com/apache/beam/pull/12616
>> 5: https://github.com/apache/beam/pull/12617
>>
>> On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik  wrote:
>>
>>> There shouldn't be any changes required since the wrapper will smoothly
>>> transition the execution to be run as an SDF. New IOs should strongly
>>> prefer to use SDF since it should be simpler to write and will be more
>>> flexible but they can use the "*Source"-based APIs. Eventually we'll
>>> deprecate the APIs but we will never stop supporting them. Eventually they
>>> should all be migrated to use SDF and if there is another major Beam
>>> version, we'll finally be able to remove them.
>>>
>>> On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Great to hear about such progress on this!
>>>>
>>>> Talking about opt-out for all runners in the future, will it require
>>>> any code changes for current “*Source”-based IOs or the wrappers should
>>>> completely smooth this transition?
>>>> Do we need to require to create new IOs only based on SDF or again, the
>>>> wrappers should help to avoid this?
>>>>
>>>> On 10 Aug 2020, at 22:59, Luke Cwik  wrote:
>>>>
>>>> In the past couple of months wrappers[1, 2] have been added to the Beam
>>>> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
>>>> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
>>>> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
>>>> other pipelines.
>>>>
>>>> I would like to start making the non-portable pipelines starting with
>>>> the DirectRunner[3] to be opt-out with the plan that eventually all runners
>>>> will only execute splittable DoFns and the BoundedSource/UnboundedSource
>>>> specific execution logic from the runners will be removed.
>>>>
>>>> Users will be able to opt-in any pipeline using the experiment

Re: How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Luke Cwik
GroupIntoBatches should always emit any buffered elements on window
expiration.

On Wed, Aug 26, 2020 at 8:55 AM Alex Amato  wrote:

> How does groupIntoBatches behave when there are too few elements for a key
> (less than the provided batch size)?
>
> Based on how its described
> .
> Its not clear to me that the elements will ever emit. Can this cause
> stuckness in this case?
>


Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Luke Cwik
Yes it does.

There should be a reshuffle between the initial splitting and the
processing portion.

On Fri, Aug 21, 2020 at 11:04 AM Jiadai Xia  wrote:

> I am using v1. Does v1 support the initial splitting and distribution?
> since I expect it to distribute the initial splitting to multiple workers.
>
> On Fri, Aug 21, 2020 at 11:00 AM Luke Cwik  wrote:
>
>> Are you using Dataflow runner v2[1] since the default for Beam Java still
>> uses Dataflow runner v1?
>> Dataflow runner v2 is the only one that supports autoscaling and dynamic
>> splitting of splittable dofns in bounded pipelines.
>>
>> 1:
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
>>
>> On Fri, Aug 21, 2020 at 10:54 AM Jiadai Xia  wrote:
>>
>>> Hi,
>>> As stated in the title, I tried to implement a SDF for reading the
>>> Parquet file and I am trying to run it with Dataflow runner. As the initial
>>> split outputs a bunch of ranges but the number of workers are not scaled up
>>> and the work is not distributed. Any suggestion on what can be the problem?
>>> I have tested it with Direct runner and the parallelism looks fine on
>>> small samples on Direct Runner.
>>> Below is my implementation of the SDF
>>> https://github.com/apache/beam/pull/12223
>>> --
>>>
>>>
>>>
>>>
>>>
>>> *Jiadai Xia*
>>>
>>> SWE Intern
>>>
>>> 1 (646) 413 8071 <(646)%20413-8071>
>>>
>>> daniel...@google.com
>>>
>>> <https://www.linkedin.com/company/google/>
>>> <https://www.youtube.com/user/lifeatgoogle>
>>> <https://www.facebook.com/lifeatgoogle/>
>>> <https://twitter.com/lifeatgoogle>
>>>
>>> <https://www.instagram.com/lifeatgoogle>
>>>
>>>
>>>
>
> --
>
>
>
>
>
> *Jiadai Xia*
>
> SWE Intern
>
> 1 (646) 413 8071 <(646)%20413-8071>
>
> daniel...@google.com
>
> <https://www.linkedin.com/company/google/>
> <https://www.youtube.com/user/lifeatgoogle>
> <https://www.facebook.com/lifeatgoogle/>
> <https://twitter.com/lifeatgoogle>
>
> <https://www.instagram.com/lifeatgoogle>
>
>
>


Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Luke Cwik
The DoFn is associated with a PTransform and in the pipeline proto there is
a unique id associated with each PTransform. You can use that to generate a
composite key (ptransformid, stateid) which will be unique within the
pipeline.

On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  wrote:

> Thank you Reuven for the confirmation. Do you know what is the recommended
> way for underlying runners to distinguish same state id in different
> DoFn(s)?
>
> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
>
> StateId is scoped to the DoFn. You can use the same string in different
> DoFns for completely different states.
>
> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  wrote:
>
>> Hello everyone,
>>
>> After reading through Stateful processing with Apache Beam
>>  and DoFn.StateId
>> ,
>> I understand that each state id must be unique and must be the same type at
>> least in the same DoFn, however, it does not explicitly mention whether or
>> not it is expected and supported that the same state id to be declared in
>> different DoFn(s). If Yes, is the state supposed to be a shared state or is
>> supposed to completed separate, therefore it could even be different types.
>> If No, it seems that the validation in Beam SDK only validates uniqueness
>> in the same DoFn.
>>
>> Thanks,
>> Ke
>>
>
>


Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Luke Cwik
Are you using Dataflow runner v2[1] since the default for Beam Java still
uses Dataflow runner v1?
Dataflow runner v2 is the only one that supports autoscaling and dynamic
splitting of splittable dofns in bounded pipelines.

1:
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2

On Fri, Aug 21, 2020 at 10:54 AM Jiadai Xia  wrote:

> Hi,
> As stated in the title, I tried to implement a SDF for reading the Parquet
> file and I am trying to run it with Dataflow runner. As the initial split
> outputs a bunch of ranges but the number of workers are not scaled up and
> the work is not distributed. Any suggestion on what can be the problem?
> I have tested it with Direct runner and the parallelism looks fine on
> small samples on Direct Runner.
> Below is my implementation of the SDF
> https://github.com/apache/beam/pull/12223
> --
>
>
>
>
>
> *Jiadai Xia*
>
> SWE Intern
>
> 1 (646) 413 8071 <(646)%20413-8071>
>
> daniel...@google.com
>
> 
> 
> 
> 
>
> 
>
>
>


Re: Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Luke Cwik
Sounds good.

Note that you'll also want to update ReadFromKafkaDoFn[1] and provide tests
that cover both to make sure we don't regress and stop providing headers.

1:
https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309

On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude 
wrote:

> Hi everyone,
>
> We bumped into an API issue with the deserializer called on constructing
> KafaRecord instances in the KafkaIO module.
>
> I wanted to float this past the mailing list for discussion first before
> exploring further.
>
> The callsite referenced: KafkaIO only calls the deserializer with the
> simplified API that does not include Kafka record headers (even though they
> are available to pass as an argument):
> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203
>
> Our SerDes implementaton relies on Kafka Headers support and it was added
> to Kafka records via KIP as a means to include metadata cleanly and not
> abuse keys or values for such purposes.
>
> It is also a valid Deserializer API as per the official Kafka interface:
>
> *
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
> * It delegates to the simplified version as it's default implementation
> (which requires a formal implementation) in
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
> * The default behaviour is thus backwards compatible, with a preference
> for the header specific API
>
> We've used the custom SerDes without issues in a complex Connect and
> Streams pipeline, but bumped into this API divergence of not preferring the
> deserializer API with headers as the primary deserializer mechanism.
>
> The same API used elsewhere.
>
> * It's the default for the stock Java consumer:
> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
> (header enabled calls simplified API)
> * Ditto Kafka Connect:
> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
> * And Kafka Streams:
> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66
>
> Any thoughts on the proposed change with the additional headers argument
> passed on deserialization?
>
> Best,
> Lourens
>


Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-20 Thread Luke Cwik
+user 

On Thu, Aug 20, 2020 at 9:47 AM Luke Cwik  wrote:

> Are you using Dataflow runner v2[1]?
>
> If so, then you can use:
> --number_of_worker_harness_threads=X
>
> Do you know where/why the OOM is occurring?
>
> 1:
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
> 2:
> https://github.com/apache/beam/blob/017936f637b119f0b0c0279a226c9f92a2cf4f15/sdks/python/apache_beam/options/pipeline_options.py#L834
>
> On Thu, Aug 20, 2020 at 7:33 AM Kamil Wasilewski <
> kamil.wasilew...@polidea.com> wrote:
>
>> Hi all,
>>
>> As I stated in the title, is there an equivalent for
>> --numberOfWorkerHarnessThreads in Python SDK? I've got a streaming pipeline
>> in Python which suffers from OutOfMemory exceptions (I'm using Dataflow).
>> Switching to highmem workers solved the issue, but I wonder if I can set a
>> limit of threads that will be used in a single worker to decrease memory
>> usage.
>>
>> Regards,
>> Kamil
>>
>>


Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-20 Thread Luke Cwik
Are you using Dataflow runner v2[1]?

If so, then you can use:
--number_of_worker_harness_threads=X

Do you know where/why the OOM is occurring?

1:
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
2:
https://github.com/apache/beam/blob/017936f637b119f0b0c0279a226c9f92a2cf4f15/sdks/python/apache_beam/options/pipeline_options.py#L834

On Thu, Aug 20, 2020 at 7:33 AM Kamil Wasilewski <
kamil.wasilew...@polidea.com> wrote:

> Hi all,
>
> As I stated in the title, is there an equivalent for
> --numberOfWorkerHarnessThreads in Python SDK? I've got a streaming pipeline
> in Python which suffers from OutOfMemory exceptions (I'm using Dataflow).
> Switching to highmem workers solved the issue, but I wonder if I can set a
> limit of threads that will be used in a single worker to decrease memory
> usage.
>
> Regards,
> Kamil
>
>


Re: [BEAM-10292] change proposal to DefaultFilenamePolicy.ParamsCoder

2020-08-19 Thread Luke Cwik
I took a look at the PR and suggested some changes.

On Tue, Aug 18, 2020 at 8:16 AM David Janíček 
wrote:

> I looked at the possibility to fix the underlying filesystem and it turns
> out that only the local filesystem couldn't handle decoding right, HDFS and
> some other filesystem, e.g. S3, already have a check for that.
> So I added a similar check to the local filesystem too. The implementation
> is in the same pull request https://github.com/apache/beam/pull/12050.
>
> Can you take a look at it, please?
>
> Thanks,
> David
>
> út 11. 8. 2020 v 19:39 odesílatel Luke Cwik  napsal:
>
>> The filesystem "fixes" all surmount to removing the "isDirectory" boolean
>> bit and encoding whether something is a directory in the string part of the
>> resource specification which also turns out to be backwards incompatible
>> (just in a different way).
>>
>> Removing the "directory" bit would be great and that would allow us to
>> use strings instead of resource ids but would require filesystems to
>> perform the mapping from some standard path specification to their internal
>> representation.
>>
>> On Wed, Aug 5, 2020 at 9:26 PM Chamikara Jayalath 
>> wrote:
>>
>>> So, based on the comments in the PR, the underlying issue seems to be
>>> 'FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));'
>>> not returning the correct result, right ?
>>> If so I think the correct fix might be your proposal (2) - Try to fix
>>> the underlying filesystem to do a better job of file/dir matching
>>>
>>> This is a bug we probably have to fix anyways for the local filesystem
>>> and/or HDFS and this will also give us a solution that does not break
>>> update compatibility.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Aug 5, 2020 at 3:41 PM Luke Cwik  wrote:
>>>
>>>> Cham, that was one of the options I had mentioned on the PR. The
>>>> difference here is that this is a bug fix and existing users could be
>>>> broken unknowingly so it might be worthwhile to take that breaking change
>>>> (and possibly provide users a way to perform an upgrade using the old
>>>> implementation).
>>>>
>>>>
>>>> On Wed, Aug 5, 2020 at 3:33 PM Chamikara Jayalath 
>>>> wrote:
>>>>
>>>>> This might break the update compatibility for Dataflow streaming
>>>>> pipelines. +Reuven Lax   +Lukasz Cwik
>>>>> 
>>>>>
>>>>> In other cases, to save update compatibility, we introduced a user
>>>>> option that changes the coder only when the user explicitly asks for an
>>>>> updated feature that requires the new coder. For example,
>>>>> https://github.com/apache/beam/commit/304882caa89afe24150062b959ee915c79e72ab3
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>> On Mon, Aug 3, 2020 at 10:00 AM David Janíček 
>>>>> wrote:
>>>>>
>>>>>> Hello everyone,
>>>>>>
>>>>>> I've reported an issue
>>>>>> https://issues.apache.org/jira/browse/BEAM-10292 which is about
>>>>>> broken DefaultFilenamePolicy.ParamsCoder behavior.
>>>>>> DefaultFilenamePolicy.ParamsCoder loses information whether
>>>>>> DefaultFilenamePolicy.Params's baseFilename resource is file or directory
>>>>>> on some filesystems, at least on local FS and HDFS.
>>>>>>
>>>>>> After discussion with @dmvk and @lukecwik, we have agreed that the
>>>>>> best solution could be to take the breaking change and use 
>>>>>> ResourceIdCoder
>>>>>> for encoding/decoding DefaultFilenamePolicy.Params's baseFilename, this 
>>>>>> way
>>>>>> the file/directory information is preserved.
>>>>>> The solution is implemented in pull request
>>>>>> https://github.com/apache/beam/pull/12050.
>>>>>>
>>>>>> I'd like to ask if there is a consensus on this breaking change. Is
>>>>>> everyone OK with this?
>>>>>> Thanks in advance for answers.
>>>>>>
>>>>>> Best regards,
>>>>>> David
>>>>>>
>>>>>
>
> --
> S pozdravem
> David Janíček
>


Re: Percentile metrics in Beam

2020-08-18 Thread Luke Cwik
getPMForCDF[1] seems to return a CDF and you can choose the split points
(b0, b1, b2, ...).

1:
https://github.com/stanford-futuredata/msketch/blob/cf4e49e860761f48ebdeb00f650ce997c46073e2/javamsketch/quantilebench/src/main/java/yahoo/DoublesPmfCdfImpl.java#L16

On Tue, Aug 18, 2020 at 11:20 AM Alex Amato  wrote:

> I'm a bit confused, are you sure that it is possible to derive the CDF?
> Using the moments variables.
>
> The linked implementation on github seems to not use a derived CDF
> equation, but instead using some sampling technique (which I can't fully
> grasp yet) to estimate how many elements are in each bucket.
>
> linearTimeIncrementHistogramCounters
>
> https://github.com/stanford-futuredata/msketch/blob/cf4e49e860761f48ebdeb00f650ce997c46073e2/javamsketch/quantilebench/src/main/java/yahoo/DoublesPmfCdfImpl.java#L117
>
> Calls into .get() to do some sort of sampling
>
> https://github.com/stanford-futuredata/msketch/blob/cf4e49e860761f48ebdeb00f650ce997c46073e2/javamsketch/quantilebench/src/main/java/yahoo/DirectDoublesSketchAccessor.java#L29
>
>
>
> On Tue, Aug 18, 2020 at 9:52 AM Ke Wu  wrote:
>
>> Hi Alex,
>>
>> It is great to know you are working on the metrics. Do you have any
>> concern if we add a Histogram type metrics in Samza Runner itself for now
>> so we can start using it before a generic histogram metrics can be
>> introduced in the Metrics class?
>>
>> Best,
>> Ke
>>
>> On Aug 18, 2020, at 12:57 AM, Gleb Kanterov  wrote:
>>
>> Hi Alex,
>>
>> I'm not sure about restoring histogram, because the use-case I had in the
>> past used percentiles. As I understand it, you can approximate histogram if
>> you know percentiles and total count. E.g. 5% of values fall into
>> [P95, +INF) bucket, other 5% [P90, P95), etc. I don't understand the paper
>> well enough to say how it's going to work if given bucket boundaries happen
>> to include a small number of values. I guess it's a similar kind of
>> trade-off when we need to choose boundaries if we want to get percentiles
>> from histogram buckets. I see primarily moment sketch as a method intended
>> to approximate percentiles, not histogram buckets.
>>
>> /Gleb
>>
>> On Tue, Aug 18, 2020 at 2:13 AM Alex Amato  wrote:
>>
>>> Hi Gleb, and Luke
>>>
>>> I was reading through the paper, blog and github you linked to. One
>>> thing I can't figure out is if it's possible to use the Moment Sketch to
>>> restore an original histogram.
>>> Given bucket boundaries: b0, b1, b2, b3, ...
>>> Can we obtain the counts for the number of values inserted each of the
>>> ranges: [-INF, B0), … [Bi, Bi+1), …
>>> (This is a requirement I need)
>>>
>>> Not be confused with the percentile/threshold based queries discussed in
>>> the blog.
>>>
>>> Luke, were you suggesting collecting both and sending both over the FN
>>> API wire? I.e. collecting both
>>>
>>>- the variables to represent the Histogram as suggested in
>>>https://s.apache.org/beam-histogram-metrics:
>>>- In addition to the moment sketch variables
>>>
>>> <https://blog.acolyer.org/2018/10/31/moment-based-quantile-sketches-for-efficient-high-cardinality-aggregation-queries/>
>>>.
>>>
>>> I believe that would be feasible, as we would still retain the Histogram
>>> data. I don't think we can restore the Histograms with just the Sketch, if
>>> that was the suggestion. Please let me know if I misunderstood.
>>>
>>> If that's correct, I can write up the benefits and drawbacks I see for
>>> both approaches.
>>>
>>>
>>> On Mon, Aug 17, 2020 at 9:23 AM Luke Cwik  wrote:
>>>
>>>> That is an interesting suggestion to change to use a sketch.
>>>>
>>>> I believe having one metric URN that represents all this information
>>>> grouped together would make sense instead of attempting to aggregate
>>>> several metrics together. The underlying implementation of using
>>>> sum/count/max/min would stay the same but we would want a single object
>>>> that abstracts this complexity away for users as well.
>>>>
>>>> On Mon, Aug 17, 2020 at 3:42 AM Gleb Kanterov  wrote:
>>>>
>>>>> Didn't see proposal by Alex before today. I want to add a few more
>>>>> cents from my side.
>>>>>
>>>>> There is a paper Moment-based quantile sketches for efficient high
>>>>> cardinality aggregation queries [1], a TL;D

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-08-18 Thread Luke Cwik
I have made some good progress here and have gotten to the following state
for non-portable runners:

DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
Twister2[2]: Ready for review. Supports Read.Bounded, the current runner
doesn't support unbounded pipelines.
Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not certain
about level of unbounded pipeline support coverage since Spark uses its own
tiny suite of tests to get unbounded pipeline coverage instead of the
validates runner set.
Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely needs
additional work.
Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of unbounded
pipeline support coverage since Spark uses its own tiny suite of tests to
get unbounded pipeline coverage instead of the validates runner set.
Flink: Unstarted.

@Pulasthi Supun Wickramasinghe  , can you help me
with the Twister2 PR[2]?
@Ismaël Mejía , is PR[3] the expected level of support
for unbounded pipelines and hence ready for review?
@Jozsef Bartok , can you help me out to get support
for unbounded splittable DoFn's into Jet[4]?
@Xinyu Liu , is PR[5] the expected level of support
for unbounded pipelines and hence ready for review?

1: https://github.com/apache/beam/pull/12519
2: https://github.com/apache/beam/pull/12594
3: https://github.com/apache/beam/pull/12603
4: https://github.com/apache/beam/pull/12616
5: https://github.com/apache/beam/pull/12617

On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik  wrote:

> There shouldn't be any changes required since the wrapper will smoothly
> transition the execution to be run as an SDF. New IOs should strongly
> prefer to use SDF since it should be simpler to write and will be more
> flexible but they can use the "*Source"-based APIs. Eventually we'll
> deprecate the APIs but we will never stop supporting them. Eventually they
> should all be migrated to use SDF and if there is another major Beam
> version, we'll finally be able to remove them.
>
> On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko 
> wrote:
>
>> Hi Luke,
>>
>> Great to hear about such progress on this!
>>
>> Talking about opt-out for all runners in the future, will it require any
>> code changes for current “*Source”-based IOs or the wrappers should
>> completely smooth this transition?
>> Do we need to require to create new IOs only based on SDF or again, the
>> wrappers should help to avoid this?
>>
>> On 10 Aug 2020, at 22:59, Luke Cwik  wrote:
>>
>> In the past couple of months wrappers[1, 2] have been added to the Beam
>> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
>> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
>> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
>> other pipelines.
>>
>> I would like to start making the non-portable pipelines starting with the
>> DirectRunner[3] to be opt-out with the plan that eventually all runners
>> will only execute splittable DoFns and the BoundedSource/UnboundedSource
>> specific execution logic from the runners will be removed.
>>
>> Users will be able to opt-in any pipeline using the experiment
>> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
>> portable pipelines these experiments were 'beam_fn_api' and
>> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
>> additional aliases to make the experience less confusing).
>>
>> 1:
>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>> 2:
>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>> 3: https://github.com/apache/beam/pull/12519
>>
>>
>>


Re: Percentile metrics in Beam

2020-08-18 Thread Luke Cwik
You can use a cumulative distribution function over the sketch at b0, b1,
b2, b3, ... which will tell you the probability that any given value is <=
X. You multiply that probability against the total count (which is also
recorded as part of the sketch) to get an estimate for the number of values
<= X. If you do this for b0, b1, b2, b3, ... you'll then be able to compute
an estimate for the number of values in each bucket. Note that you're
restoring estimates for how large each bucket is and as such there is some
error.

I was suggesting collecting one or the other, not both. Even if we get this
wrong, we can always swap to the other method by defining a new metric URN
and changing the underlying implementation and what is sent but keeping the
user facing API the same.

On Mon, Aug 17, 2020 at 5:13 PM Alex Amato  wrote:

> Hi Gleb, and Luke
>
> I was reading through the paper, blog and github you linked to. One thing
> I can't figure out is if it's possible to use the Moment Sketch to restore
> an original histogram.
> Given bucket boundaries: b0, b1, b2, b3, ...
> Can we obtain the counts for the number of values inserted each of the
> ranges: [-INF, B0), … [Bi, Bi+1), …
> (This is a requirement I need)
>
> Not be confused with the percentile/threshold based queries discussed in
> the blog.
>
> Luke, were you suggesting collecting both and sending both over the FN API
> wire? I.e. collecting both
>
>- the variables to represent the Histogram as suggested in
>https://s.apache.org/beam-histogram-metrics:
>- In addition to the moment sketch variables
>
> <https://blog.acolyer.org/2018/10/31/moment-based-quantile-sketches-for-efficient-high-cardinality-aggregation-queries/>
>.
>
> I believe that would be feasible, as we would still retain the Histogram
> data. I don't think we can restore the Histograms with just the Sketch, if
> that was the suggestion. Please let me know if I misunderstood.
>
> If that's correct, I can write up the benefits and drawbacks I see for
> both approaches.
>
>
> On Mon, Aug 17, 2020 at 9:23 AM Luke Cwik  wrote:
>
>> That is an interesting suggestion to change to use a sketch.
>>
>> I believe having one metric URN that represents all this information
>> grouped together would make sense instead of attempting to aggregate
>> several metrics together. The underlying implementation of using
>> sum/count/max/min would stay the same but we would want a single object
>> that abstracts this complexity away for users as well.
>>
>> On Mon, Aug 17, 2020 at 3:42 AM Gleb Kanterov  wrote:
>>
>>> Didn't see proposal by Alex before today. I want to add a few more cents
>>> from my side.
>>>
>>> There is a paper Moment-based quantile sketches for efficient high
>>> cardinality aggregation queries [1], a TL;DR that for some N (around 10-20
>>> depending on accuracy) we need to collect SUM(log^N(X)) ... log(X),
>>> COUNT(X), SUM(X), SUM(X^2)... SUM(X^N), MAX(X), MIN(X). Given aggregated
>>> numbers, it uses solver for Chebyshev polynomials to get quantile number,
>>> and there is already Java implementation for it on GitHub [2].
>>>
>>> This way we can express quantiles using existing metric types in Beam,
>>> that can be already done without SDK or runner changes. It can fit nicely
>>> into existing runners and can be abstracted over if needed. I think this is
>>> also one of the best implementations, it has < 1% error rate for 200 bytes
>>> of storage, and quite efficient to compute. Did we consider using that?
>>>
>>> [1]:
>>> https://blog.acolyer.org/2018/10/31/moment-based-quantile-sketches-for-efficient-high-cardinality-aggregation-queries/
>>> [2]: https://github.com/stanford-futuredata/msketch
>>>
>>> On Sat, Aug 15, 2020 at 6:15 AM Alex Amato  wrote:
>>>
>>>> The distinction here is that even though these metrics come from user
>>>> space, we still gave them specific URNs, which imply they have a specific
>>>> format, with specific labels, etc.
>>>>
>>>> That is, we won't be packaging them into a USER_HISTOGRAM urn. That URN
>>>> would have less expectation for its format. Today the USER_COUNTER just
>>>> expects like labels (TRANSFORM, NAME, NAMESPACE).
>>>>
>>>> We didn't decide on making a private API. But rather an API
>>>> available to user code for populating metrics with specific labels, and
>>>> specific URNs. The same API could pretty much be used for user
>>>> USER_HISTOGRAM. with a default URN chosen.
>>>> Thats how I see it in my head at

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

2020-08-17 Thread Luke Cwik
At this point in time I would recommend that you build a runner that
executes pipelines using only the portability layer like Flink/Samza/Spark
[1,2,3].

1:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
2:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
3:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java

On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra 
wrote:

> Hi,
>
> I have been working on implementing the Apache Ignite Runner to run Apache
> Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> have implemented the normalize pipeline method and currently working on run
> method implementation for Pipeline and IgnitePipelineTranslator.
>
> Jira : https://issues.apache.org/jira/browse/BEAM-9045
>
> PR : https://github.com/apache/beam/pull/12593
>
> Please review and feel free to share any feedback or questions.
>
> Regards,
> Saikat
>


Re: Percentile metrics in Beam

2020-08-17 Thread Luke Cwik
That is an interesting suggestion to change to use a sketch.

I believe having one metric URN that represents all this information
grouped together would make sense instead of attempting to aggregate
several metrics together. The underlying implementation of using
sum/count/max/min would stay the same but we would want a single object
that abstracts this complexity away for users as well.

On Mon, Aug 17, 2020 at 3:42 AM Gleb Kanterov  wrote:

> Didn't see proposal by Alex before today. I want to add a few more cents
> from my side.
>
> There is a paper Moment-based quantile sketches for efficient high
> cardinality aggregation queries [1], a TL;DR that for some N (around 10-20
> depending on accuracy) we need to collect SUM(log^N(X)) ... log(X),
> COUNT(X), SUM(X), SUM(X^2)... SUM(X^N), MAX(X), MIN(X). Given aggregated
> numbers, it uses solver for Chebyshev polynomials to get quantile number,
> and there is already Java implementation for it on GitHub [2].
>
> This way we can express quantiles using existing metric types in Beam,
> that can be already done without SDK or runner changes. It can fit nicely
> into existing runners and can be abstracted over if needed. I think this is
> also one of the best implementations, it has < 1% error rate for 200 bytes
> of storage, and quite efficient to compute. Did we consider using that?
>
> [1]:
> https://blog.acolyer.org/2018/10/31/moment-based-quantile-sketches-for-efficient-high-cardinality-aggregation-queries/
> [2]: https://github.com/stanford-futuredata/msketch
>
> On Sat, Aug 15, 2020 at 6:15 AM Alex Amato  wrote:
>
>> The distinction here is that even though these metrics come from user
>> space, we still gave them specific URNs, which imply they have a specific
>> format, with specific labels, etc.
>>
>> That is, we won't be packaging them into a USER_HISTOGRAM urn. That URN
>> would have less expectation for its format. Today the USER_COUNTER just
>> expects like labels (TRANSFORM, NAME, NAMESPACE).
>>
>> We didn't decide on making a private API. But rather an API available to
>> user code for populating metrics with specific labels, and specific URNs.
>> The same API could pretty much be used for user USER_HISTOGRAM. with a
>> default URN chosen.
>> Thats how I see it in my head at the moment.
>>
>>
>> On Fri, Aug 14, 2020 at 8:52 PM Robert Bradshaw 
>> wrote:
>>
>>> On Fri, Aug 14, 2020 at 7:35 PM Alex Amato  wrote:
>>> >
>>> > I am only tackling the specific metrics covered in (for the python SDK
>>> first, then Java). To collect latency of IO API RPCS, and store it in a
>>> histogram.
>>> > https://s.apache.org/beam-gcp-debuggability
>>> >
>>> > User histogram metrics are unfunded, as far as I know. But you should
>>> be able to extend what I do for that project to the user metric use case. I
>>> agree, it won't be much more work to support that. I designed the histogram
>>> with the user histogram case in mind.
>>>
>>> From the portability point of view, all metrics generated in users
>>> code (and SDK-side IOs are "user code") are user metrics. But
>>> regardless of how things are named, once we have histogram metrics
>>> crossing the FnAPI boundary all the infrastructure will be in place.
>>> (At least the plan as I understand it shouldn't use private APIs
>>> accessible only by the various IOs but not other SDK-level code.)
>>>
>>> > On Fri, Aug 14, 2020 at 5:47 PM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> Once histograms are implemented in the SDK(s) (Alex, you're tackling
>>> >> this, right?) it shoudn't be much work to update the Samza worker code
>>> >> to publish these via the Samza runner APIs (in parallel with Alex's
>>> >> work to do the same on Dataflow).
>>> >>
>>> >> On Fri, Aug 14, 2020 at 5:35 PM Alex Amato 
>>> wrote:
>>> >> >
>>> >> > Noone has any plans currently to work on adding a generic histogram
>>> metric, at the moment.
>>> >> >
>>> >> > But I will be actively working on adding it for a specific set of
>>> metrics in the next quarter or so
>>> >> > https://s.apache.org/beam-gcp-debuggability
>>> >> >
>>> >> > After that work, one could take a look at my PRs for reference to
>>> create new metrics using the same histogram. One may wish to implement the
>>> UserHistogram use case and use that in the Samza Runner
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Fri, Aug 14, 2020 at 5:25 PM Ke Wu  wrote:
>>> >> >>
>>> >> >> Thank you Robert and Alex. I am not running a Beam job in Google
>>> Cloud but with Samza Runner, so I am wondering if there is any ETA to add
>>> the Histogram metrics in Metrics class so it can be mapped to the
>>> SamzaHistogram metric to the actual emitting.
>>> >> >>
>>> >> >> Best,
>>> >> >> Ke
>>> >> >>
>>> >> >> On Aug 14, 2020, at 4:44 PM, Alex Amato 
>>> wrote:
>>> >> >>
>>> >> >> One of the plans to use the histogram data is to send it to Google
>>> Monitoring to compute estimates of percentiles. This is done using the
>>> bucket counts and bucket boundaries.
>>> >> >>
>>> >> >> Here is a 

Re: Output timestamp for Python event timers

2020-08-11 Thread Luke Cwik
+1 on what Boyuan said. It is important that the defaults for processing
time domain differ from the defaults for the event time domain.

On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang  wrote:

> +1 to expose set_output_timestamp and enrich python set timer api.
>
> On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  wrote:
>
>> Hi Maximilian,
>>
>> It makes sense to set  hold_timestamp as fire_timestamp when the
>> fire_timestamp is in the event time domain. Otherwise, the system may
>> advance the watermark incorrectly.
>> I think we can do something similar to Java FnApiRunner[1]:
>>
>>- Expose set_output_timestamp API to python timer as well
>>- If set_output_timestamp is not specified and timer is in event
>>domain, we can use fire_timestamp as hold_timestamp
>>- Otherwise, use input_timestamp as hold_timestamp.
>>
>> What do you think?
>>
>> [1]
>> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>>
>>
>>
>>
>> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels 
>> wrote:
>>
>>> We ran into problems setting event time timers per-element in the Python
>>> SDK. Pipeline progress would stall.
>>>
>>> Turns out, although the Python SDK does not expose the timer output
>>> timestamp feature to the user, it sets the timer output timestamp to the
>>> current input timestamp of an element.
>>>
>>> This will lead to holding back the watermark until the timer fires (the
>>> Flink Runner respects the timer output timestamp when advancing the
>>> output watermark). We had set the fire timestamp to a timestamp so far
>>> in the future, that pipeline progress would completely stall for
>>> downstream transforms, due to the held back watermark.
>>>
>>> Considering that this feature is not even exposed to the user in the
>>> Python SDK, I think we should set the default output timestamp to the
>>> fire timestamp, and not to the input timestamp. This is also how timer
>>> work in the Java SDK.
>>>
>>> Let me know what you think.
>>>
>>> -Max
>>>
>>> PR: https://github.com/apache/beam/pull/12531
>>>
>>


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-08-11 Thread Luke Cwik
There shouldn't be any changes required since the wrapper will smoothly
transition the execution to be run as an SDF. New IOs should strongly
prefer to use SDF since it should be simpler to write and will be more
flexible but they can use the "*Source"-based APIs. Eventually we'll
deprecate the APIs but we will never stop supporting them. Eventually they
should all be migrated to use SDF and if there is another major Beam
version, we'll finally be able to remove them.

On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko 
wrote:

> Hi Luke,
>
> Great to hear about such progress on this!
>
> Talking about opt-out for all runners in the future, will it require any
> code changes for current “*Source”-based IOs or the wrappers should
> completely smooth this transition?
> Do we need to require to create new IOs only based on SDF or again, the
> wrappers should help to avoid this?
>
> On 10 Aug 2020, at 22:59, Luke Cwik  wrote:
>
> In the past couple of months wrappers[1, 2] have been added to the Beam
> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
> other pipelines.
>
> I would like to start making the non-portable pipelines starting with the
> DirectRunner[3] to be opt-out with the plan that eventually all runners
> will only execute splittable DoFns and the BoundedSource/UnboundedSource
> specific execution logic from the runners will be removed.
>
> Users will be able to opt-in any pipeline using the experiment
> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
> portable pipelines these experiments were 'beam_fn_api' and
> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
> additional aliases to make the experience less confusing).
>
> 1:
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
> 2:
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
> 3: https://github.com/apache/beam/pull/12519
>
>
>


Re: [BEAM-10292] change proposal to DefaultFilenamePolicy.ParamsCoder

2020-08-11 Thread Luke Cwik
The filesystem "fixes" all surmount to removing the "isDirectory" boolean
bit and encoding whether something is a directory in the string part of the
resource specification which also turns out to be backwards incompatible
(just in a different way).

Removing the "directory" bit would be great and that would allow us to use
strings instead of resource ids but would require filesystems to perform
the mapping from some standard path specification to their internal
representation.

On Wed, Aug 5, 2020 at 9:26 PM Chamikara Jayalath 
wrote:

> So, based on the comments in the PR, the underlying issue seems to be
> 'FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));'
> not returning the correct result, right ?
> If so I think the correct fix might be your proposal (2) - Try to fix the
> underlying filesystem to do a better job of file/dir matching
>
> This is a bug we probably have to fix anyways for the local filesystem
> and/or HDFS and this will also give us a solution that does not break
> update compatibility.
>
> Thanks,
> Cham
>
> On Wed, Aug 5, 2020 at 3:41 PM Luke Cwik  wrote:
>
>> Cham, that was one of the options I had mentioned on the PR. The
>> difference here is that this is a bug fix and existing users could be
>> broken unknowingly so it might be worthwhile to take that breaking change
>> (and possibly provide users a way to perform an upgrade using the old
>> implementation).
>>
>>
>> On Wed, Aug 5, 2020 at 3:33 PM Chamikara Jayalath 
>> wrote:
>>
>>> This might break the update compatibility for Dataflow streaming
>>> pipelines. +Reuven Lax   +Lukasz Cwik
>>> 
>>>
>>> In other cases, to save update compatibility, we introduced a user
>>> option that changes the coder only when the user explicitly asks for an
>>> updated feature that requires the new coder. For example,
>>> https://github.com/apache/beam/commit/304882caa89afe24150062b959ee915c79e72ab3
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>> On Mon, Aug 3, 2020 at 10:00 AM David Janíček 
>>> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I've reported an issue https://issues.apache.org/jira/browse/BEAM-10292
>>>> which is about broken DefaultFilenamePolicy.ParamsCoder behavior.
>>>> DefaultFilenamePolicy.ParamsCoder loses information whether
>>>> DefaultFilenamePolicy.Params's baseFilename resource is file or directory
>>>> on some filesystems, at least on local FS and HDFS.
>>>>
>>>> After discussion with @dmvk and @lukecwik, we have agreed that the best
>>>> solution could be to take the breaking change and use ResourceIdCoder for
>>>> encoding/decoding DefaultFilenamePolicy.Params's baseFilename, this way the
>>>> file/directory information is preserved.
>>>> The solution is implemented in pull request
>>>> https://github.com/apache/beam/pull/12050.
>>>>
>>>> I'd like to ask if there is a consensus on this breaking change. Is
>>>> everyone OK with this?
>>>> Thanks in advance for answers.
>>>>
>>>> Best regards,
>>>> David
>>>>
>>>


Re: [DISCUSS] Better alignment of Apache Flink and Apache Beam releases

2020-08-10 Thread Luke Cwik
Is there a way we could use a fixed point in time Flink nightly that passes
all the tests/validation and bump up the nightly version manually to get
"closer" to the release candidate instead of doing another branch?

This would mean that any changes that impact the Flink runner that are
related to project clean-up or that are cross-cutting would be responsible
to fix the nightly version as well. It might also lead to fewer
integrations and merge conflicts when attempting to merge said branch back
into master.


On Mon, Aug 10, 2020 at 3:35 AM Jan Lukavský  wrote:

> Hi,
>
> I "sense a disturbance in the force" relating to the way we release Beam
> with supported Flink versions. The current released version of Apache
> Flink is 1.11.1, while we still support (at least up to Beam 2.24.0)
> only version 1.10.1. There is tracking issue for 1.11. support [1], but
> even if someone starts to work on this soon, it will probably not make
> it to sooner release than 2.26.0, surely not before 2.25.0). I think
> that the features included in newest Flink releases are pretty much
> needed by our users, so I'd like to revive a somewhat left-over
> discussion started in [2]. I think that we could be more aligned with
> Flink's release is we created the following workflow:
>
>   - when a new Flink version is released, create a new branch for
> flink-runner-
>
>   - this new branch would depend on publihed SNAPSHOT version of the
> not-yet-released version of Flink
>
>   - we would need a jenkins job that would periodically do builds
> against new SNAPSHOTs and notify (some, volunteers welcome :))
> committers about the status of the build
>
>   - this way, we might have people aware of incompatibilities, and
> (pretty much) increase the chance, that the new runner branch would be
> in shape to be able to switch from SNAPSHOT to release as soon as the
> version of Beam gets released, merging the released version would mean
> we create another branch for the new SNAPSHOT of Flink and repeat the
> process
>
> This workflow would rely on volunteer commiters (I'm one) that would be
> willing to be notified about the failures and possibly fix them.
>
> Looking forward for opinions, or alternative proposals to tackle this.
>
>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-10612
>
> [2]
>
> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
>
>


[DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-08-10 Thread Luke Cwik
In the past couple of months wrappers[1, 2] have been added to the Beam
Java SDK which can execute BoundedSource and UnboundedSource as Splittable
DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
other pipelines.

I would like to start making the non-portable pipelines starting with the
DirectRunner[3] to be opt-out with the plan that eventually all runners
will only execute splittable DoFns and the BoundedSource/UnboundedSource
specific execution logic from the runners will be removed.

Users will be able to opt-in any pipeline using the experiment
'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
portable pipelines these experiments were 'beam_fn_api' and
'beam_fn_api_use_deprecated_read' respectively and I have added these two
additional aliases to make the experience less confusing).

1:
https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
2:
https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
3: https://github.com/apache/beam/pull/12519


  1   2   3   4   5   >