Re: Building dev docker image

2024-09-19 Thread Robert Bradshaw via dev
On Thu, Sep 19, 2024 at 3:43 PM Joey Tran  wrote:

> Ah okay then. I commented out the goavro line and the image does finish
> building. It seems that the python still has be built which takes a bit
> (using ` pip install -e .[gcp,test]` inside the sdks/python directory). It
> looks like the source isn't synced with the host system it doesn't look
> like there's a git repo inside the container either so maybe the container
> env isn't worth the effort (especially if no one else is using or helping
> maintaining it)
>
> Going back to trying to get a local python environment working... I tried
> doing this a few months ago and eventually just gave up. I ran
> `./local-env-setup.sh`
>

I honestly have no idea what this is.


> and then following the instructions in the python developer guide in the
> wiki[1], I ran `pip install -e .[gcp,test]`. I eventually hit an issue in
> what looks cythonizing windowed_value.py
>
> ```
>   [14/14] Cythonizing apache_beam/utils/windowed_value.py
> ```
>
> There's a lot of messaging but it looks like the crux of the issue is:
> ```
>   clang: error: invalid arch name '-arch
> -I/private/var/folders/n1/6qk3ljm97h32j1g7qg
> ```
>
> Googling around it seems like it might be some kind of mac M2 issue, but
> there aren't any obvious solutions. Has anyone that works on the python sdk
> with a new-ish mac run into this problem?
>

I don't have a new mac, but you shouldn't need to cythonize for
development. I assume you have a virtual environment from following the
steps in the wiki? Can you "pip uninstall cython" then re-run "pip install
-e .[gcp,test]"?

Best,
> Joey
> [1] https://cwiki.apache.org/confluence/display/BEAM/Python+Tips
>
>
> On Thu, Sep 19, 2024 at 5:23 PM Robert Burke  wrote:
>
>> To my knowledge, the "dev" docker image is unmaintained, and has been for
>> quite some time, hence not having been moved to Go Modules and similar.
>>
>> I personally don't have time to delve into what's going on with it, but
>> IIRC, deleting the weird goavro lines would be a good first step.
>>
>> I'd be happy to review any PRs to help fix it though, if you get it
>> working for yourself. (Tag lostluck in GitHub).
>>
>> I think if you search the dev list, you might find previous discussions
>> on the topic of the dev container.
>>
>> Robert Burke
>>
>>
>> On Thu, Sep 19, 2024, 2:17 PM Joey Tran 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying again to get a working development image of beam. Working
>>> from the beam repo CONTRIBUTING.md, I ran `./start-build-env.sh`
>>>
>>> At image build time after getting a few steps in, the build fails with:
>>>
>>> => ERROR [7/8] RUN go get github.com/linkedin/goavro/v2
>>>
>>>  > [7/8] RUN go get github.com/linkedin/goavro/v2:
>>>
>>>
>>> 0.134 go: go.mod file not found in current directory or any parent
>>> directory.
>>> 0.134   'go get' is no longer supported outside a module.
>>>
>>> 0.134   To build and install a command, use 'go install' with a
>>> version,
>>> 0.134   like 'go install example.com/cmd@latest'
>>>
>>> 0.134   For more information, see
>>> https://golang.org/doc/go-get-install-deprecation
>>> 0.134   or run 'go help get' or 'go help install'.
>>> --
>>>  2 warnings found (use docker --debug to expand):
>>>  - LegacyKeyValueFormat: "ENV key=value" should be used instead of
>>> legacy "ENV key value" format (line 8)
>>>  - LegacyKeyValueFormat: "ENV key=value" should be used instead of
>>> legacy "ENV key value" format (line 7)
>>> Dockerfile:10
>>> 
>>>8 | ENV GOPATH
>>> /home/jtran/beam/sdks/go/examples/.gogradle/project_gopath
>>>9 | # This next command still runs as root causing the
>>> ~/.cache/go-build to be owned by root
>>>   10 | >>> RUN go get github.com/linkedin/goavro/v2
>>>   11 | RUN chown -R jtran:100 /home/jtran/.cache
>>>   12 |
>>> 
>>> ERROR: failed to solve: process "/bin/bash -o pipefail -c go get
>>> github.com/linkedin/goavro/v2" did not complete successfully: exit
>>> code: 1
>>>
>>>
>>> Any tips?
>>> Cheers,
>>> --
>>>
>>> Joey Tran
>>>
>>> [image: Schrödinger, Inc.] 
>>>
>>


Re: beam-starter-typescript is broken | long module issue | ttypescript module issue

2024-09-11 Thread Robert Bradshaw via dev
On Mon, Sep 9, 2024 at 12:00 AM Robert Weber 
wrote:

> Good morning Roberts!
>
> This is such a conversation full of bright fame :-D --> Robert is an old
> German name that means “bright fame*.”  *Just needed to celebrate somehow
> this coincidence.
>

:)


> This week I am on a business travel.
>

I've been traveling as well.


> But I can prepare something next week and document it as well.
>
> I'll send you an update.
>

So, I took a stab at this. I tried to move to typescript 5, ts-patch, and
update a bunch of packages, see https://github.com/apache/beam/pull/32439 .
It compiles but is giving import errors (e.g. "Cannot use import statement
outside a module") I can read and write typescript itself, but when it
comes to the plethora of modules and packaging structure I am out of my
depth...

https://github.com/apache/beam/pull/32439/commits/b2560be4519a88873e36ce77b084063396c3f896
was the only interesting (beam-specific) bit. I also had to do
https://github.com/apache/beam/pull/32439/commits/bfb180d7fdc0ee25d8cd427b77c5ab40deff659c
Hopefully that should be a good starting point.

(My dev environment is just cloning the repo and running "npm i" in the
sdks/typescript directory. To test, e.g. the quickstart I run "npm pack"
and then in the quickstart directory "npm i
/path/to/created/apache-beam-2.60.0-SNAPSHOT.tgz" Just getting "npm test"
to work from within sdks/typescript inside the beam repo would be a big
step forward though.)


> Am Sa., 7. Sept. 2024 um 18:55 Uhr schrieb Robert Burke <
> rob...@frantil.com>:
>
>> Hello Robert, Robert.
>>
>> Since we don't have a great deal of experience with Typescript any
>> assistance in documenting (or pointing to standard typescript
>> documentation) on how to hook up a development module to a separate project
>> that uses the module would be valuable.
>>
>> Aside, I should also do the same for the Go side, where the standard is a
>> "go.work" file that allows the development repo to overlay the modules
>> usage in a different project.
>>
>> Robert
>>
>> On Sat, Sep 7, 2024, 12:34 AM Robert Weber 
>> wrote:
>>
>>> I investigated a bit more on the typescript patch compilation topic. It
>>> seems that the ttypescript module is not well maintained anymore. The last
>>> version is more than 2 years old. An alternative is ts-patch. This one
>>> seems to be maintained and working with typescript version >5.0.0. Hence, I
>>> suggest a migration to ts-patch.
>>> ts-patch migration steps:
>>>
>>>1. install ts-patch
>>>2. use tspc instead of ttsc to compile during build (build.sh)
>>>[image: image.png]
>>>3. uninstall ttypescript
>>>
>>> I think we don't need to adapt the transformer plugins in tsconfig.json,
>>> since, the plugin options are the same as for ttypescript. Hope this works!
>>>
>>> I definitely need to setup my own dev environment. So I can build and
>>> link apache-beam to the starter project locally and test it on my own. I
>>> will do this as my next step :-).
>>>
>>> Am Fr., 6. Sept. 2024 um 18:24 Uhr schrieb Robert Bradshaw via dev <
>>> dev@beam.apache.org>:
>>>
>>>> On Fri, Sep 6, 2024 at 4:41 AM Robert Weber
>>>>  wrote:
>>>> >
>>>> > Hi everyone,
>>>> >
>>>> > I am very interested in apache beam and watched a video on youtube
>>>> about the typescript SDK and that you might need some community support.
>>>>
>>>> Yes, definitely.
>>>>
>>>> > That's why I am reaching out.
>>>>
>>>> That'd be great!
>>>>
>>>> > I have a proven experience of 10 years in data engineering and data
>>>> analytics, as well as python and javascript/typescript experience. I am
>>>> interested to support from time to time. Is this still relevant?
>>>> >
>>>> > First of all I tested the starter project for typescript.
>>>> Unfortunately, it is broken. I'd like to fix it (
>>>> https://beam.apache.org/get-started/quickstart/typescript/). So far I
>>>> have some findings regarding the beam-starter-typescript:
>>>> >
>>>> > The long module is breaking the build:
>>>> >
>>>> >
>>>> node_modules/google-gax/node_modules/@grpc/grpc-js/node_modules/long/umd/index.d.ts:1:18
>>>> - error TS1479: The current file is a CommonJS module whose imports will
>

Re: Sunsetting Beam Python 3.8 Support

2024-08-26 Thread Robert Bradshaw via dev
On Mon, Aug 26, 2024 at 11:22 AM Valentyn Tymofieiev via dev
 wrote:
>
> Interesting findings. When researching Dataflow Python usage with internal 
> telemetry, I see that Python 3.11 has slightly more usage than Python 3.8. 
> When I exclude Dev SDKs (this might also exclude some Google-internal users 
> who use bleeding-edge SDKs), Python 3.8 reaches to the top. If I exclude 
> Google Dynamic "FLEX" templates, the following become top 3:
>
> Apache Beam Python 3.9 SDK
> 24.40%
> Apache Beam Python 3.7 SDK
> 23.34%
> Apache Beam Python 3.8 SDK
> 21.63%

Interesting. I'm assuming this is across all Beam versions, right?

> This might be explained by the fact that the default "Python3" flex template 
> image referenced in the docs (at 
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#python_3)
>  is Python 3.8.

We should definitely fix that.

> > On the other hand, I do like the idea of letting the Python EoL cycle drive 
> > our own supported versions.
>
> +1. As much as I don't like force upgrades, it won't be sustainable long term 
> to keep versions indefinitely. I don't anticipate any blockers for switching 
> Python 3.8 to Python 3.9.
>
> > For many workflows like our unit test suites this is not a large change; 
> > the Python version matrix simply omits 3.8 and runs on the remaining python 
> > versions as expected. This is more complicated for a number of workflows 
> > that currently only run on 3.8 or both 3.8 and 3.12, as GitHub will not run 
> > the updated actions in the main repository until the PR updating them is 
> > submitted.
>
> Yes, that's a known inconvenience. I believe this can be worked around by 
> pushing the changes to a branch on main repo, and then manually triggering a 
> GHA workflow from that branch, if you want to be really careful. I think we 
> have this documented somewhere, but I couldn't quickly find it. @Danny 
> McCormick might have a link.
>
> Merging and iterating sounds good to me if we can quickly roll back/fix 
> forward changes to not make PRs blocked due to tests not passing.

This risk is accepting changes that are incompatible with Python 3.8.
Once we drop it (even in the dev repo) we should drop it for good.

> We also set the default Python version in 
> https://github.com/apache/beam/blob/9c0a9503ebd59778d488dcfff7fb9417a808152b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L2960
>  that might affect some workflows.
>
> > To Robert Bradshaw's point, I wouldn't necessarily be opposed to pushing 
> > out this process to 2.61.0.
> As long as we don't add a new version before remove an existing one, probably 
> no significant difference for us.

Sounds like a reasonable plan then.

> Our dependencies (like numpy, pandas, etc) are definitely dropping Python 3.8 
> support, usually ahead of us. Some Google Cloud Python Client libraries are 
> planning to drop Python 3.8 support after EOL as well.
>
> On Mon, Aug 26, 2024 at 11:17 AM Jack McCluskey via dev  
> wrote:
>>
>> To Robert Bradshaw's point, I wouldn't necessarily be opposed to pushing out 
>> this process to 2.61.0. That does give more time to validate some of the 
>> actions changes and let us warn users about the drop in 3.8 support in a 
>> release. Admittedly a major motivator for moving off of 3.8 at EoL is so I 
>> can do some overhauling of the type hinting code, as 3.8 is the last version 
>> where PEP-585 type hints are not supported by default (some context for this 
>> is available on my Current State of Beam Python Type Hinting doc from last 
>> November.) But that isn't necessarily urgent work as far as users are 
>> concerned.
>>
>> There's an argument for trying to keep our documentation and tutorials 
>> pointing at relatively recent versions of Beam, but that's probably best 
>> left as a best-effort type thing for now.
>>
>> On Mon, Aug 26, 2024 at 1:41 PM Robert Burke  wrote:
>>>
>>> A minor point but often when onboarding, folks will try things  verbatim 
>>> from the website and documentation:
>>>
>>> https://github.com/search?q=repo%3Aapache%2Fbeam+python3.7+lang%3AMarkdown+&type=code
>>>
>>> Granted, the most popular combo there was not present in this search, so 
>>> it's probably not terribly significant, compared to the reason Robert is 
>>> guessing.
>>>
>>> Dunno what we can do about that without going all out in specifying 
>>> templated versions to use in our various docs. (That has the different 
>>> problem of ensuring e

Re: [DISCUSS] Beam 3.0: Paving the Path to the Next Generation Data Processing Framework

2024-08-22 Thread Robert Bradshaw via dev
Echoing many of the comments here, but organizing them under a single
theme, I would say a good focus for Beam 3.0 could be centering around
being more "transform-centric." Specifically:

- Make it easy to mix and match transforms across pipelines and
environments (SDKs). Key to this will be a push to producing/consuming
structured data (as has been mentioned) and also well-structured,
language-agnostic configuration.
- Better encapsulation for transforms. The main culprit here is update
compatibility, but there may be other issues as well. Let's try to
actually solve that for both primitives and composites.
- Somewhat related to the above, I would love to actually solve the
early/late output issue, and I think retractions and sink triggers are
powerful paradigms we could develop to actually solve this issue in a
novel way.
- Continue to refine the idea of "best practices." This includes the
points above, as well as things like robust error handling,
monitoring, etc.

Once we have these in place we are in a position to offer a powerful
catalogue of easy-to-use, well-focused transforms, both first and
third party.

Note everything here can be backwards compatible. As a concrete
milestone for when we "reach" 3.0 I would say that our core set of
transforms have been updated to all reflect best practices (by
default?) and we have a way for third parties to also publish such
transforms.

(One more bullet point, I would love to see us complete the migration
to 100% portable runners, including local runners, which will help
with the testing and development story, but will also be key to making
the above vision work.)

On Thu, Aug 22, 2024 at 8:00 AM Kenneth Knowles  wrote:
>
> I think this is a good idea. Fun fact - I think the first time we talked 
> about "3.0" was 2018.
>
> I don't want to break users with 3.0 TBH, despite that being what a major 
> version bump suggests. But I also don't want a triple-digit minor version. I 
> think 3.0 is worthwhile if we have a new emphasis that is very meaningful to 
> users and contributors.
>
>
> A couple things I would say from experience with 2.0:
>
>  - A lot of new model features are dropped before completion. Can we make it 
> easier to evolve? Maybe not, since in a way it is our "instruction set".
>
>  - Transforms that provide straightforward functionality have a big impact: 
> RunInference, IOs, etc. I like that these get more discussion now, whereas 
> early in the project a lot of focus was on primitives and runners.
>
>  - Integrations like YAML (and there will be plenty more I'm sure) that rely 
> on transforms as true no-code black boxes with non-UDF configuration seem 
> like the next step in abstraction and ease of use.
>
>  - Update compatibility needs, which break through all our abstractions, have 
> blocked innovative changes and UX improvements, and had a chilling effect on 
> refactoring and the things that make software continue to approach Quality.
>
>
> And a few ideas I have about the future of the space, agreeing with XQ and Jan
>
>  - Unstructured data (aka "everything is bytes with coders") is overrated and 
> should be an exception not the default. Structured data everywhere, with 
> specialized bytes columns. We can make small steps in this direction (and we 
> are already).
>
>  - Triggers are really not a great construct. "Sink triggers" map better to 
> use cases but how to implement them is a long adventure. But we really can't 
> live without *something* to manage early output / late input, and the options 
> in all other systems I am aware of are even worse.
>
> And a last thought is that we shouldn't continue to work on last decade's 
> problems, if we can avoid it. Maybe there is a core to Beam that is imperfect 
> but good enough (unification of batch & streaming; integration of many 
> languages; core primitives that apply to any engine capable of handling our 
> use cases) and what we want to do is focus on what we can build on top of it. 
> I think this is implied by everything in this thread so far but I just wanted 
> to say it explicitly.
>
> Kenn
>
> On Tue, Aug 20, 2024 at 9:03 AM Jan Lukavský  wrote:
>>
>> Formatting and coloring. :)
>>
>> 
>>
>> Hi XQ,
>>
>> thanks for starting this discussion!
>>
>> I agree we are getting to a point when discussion a major update of Apache 
>> Beam might be good idea. Because such window of opportunity happens only 
>> once in (quite many) years, I think we should try to use our current 
>> experience with the Beam model itself and check if there is any room for 
>> improvement there. First of all, we have some parts of the model itself that 
>> are not implemented in Beam 2.0, e.g. retractions. Second, there are parts 
>> that are known to be error-prone, e.g. triggers. Another topic are features 
>> that are missing in the current model, e.g. iterations (yes, I know, general 
>> iterations might not be even possible, but it seems we can create a 
>> reasonable constraints for them t

Re: Updating beam container dependencies.

2024-08-08 Thread Robert Bradshaw via dev
No problem, glad to hear this is already the procedure. I'll just do
the minimal change for now.

On Thu, Aug 8, 2024 at 2:11 PM Robert Bradshaw  wrote:
>
> Ah, that's good to know. I'll let you finish this up when you have a chance.
>
> On Thu, Aug 8, 2024 at 12:11 PM Danny McCormick
>  wrote:
> >
> > Thanks for calling this out - we actually have a script to do this 
> > automatically after the release like you're suggesting. This release, it 
> > ran and the resulting PR is currently assigned to me. I have not merged it 
> > yet because there are (seemingly real) CI failures which I haven't had a 
> > chance to deal with yet. - https://github.com/apache/beam/pull/31885.
> >
> > Thanks,
> > Danny
> >
> > On Thu, Aug 8, 2024 at 9:08 PM Robert Bradshaw via dev 
> >  wrote:
> >>
> >> I noticed when updating a dependency in the base image requirements and 
> >> running
> >>
> >> ./gradlew :sdks:python:container:generatePythonRequirementsAll
> >>
> >> that a *lot* of other dependencies changed as well. Most were minor
> >> version bumps, but this seems less than ideal. I wonder if we should
> >> run this script periodically (maybe just after a release to let things
> >> bake for the next one?) rather than put it on whoever is updating a
> >> dependency to manually update everything else.


Re: Updating beam container dependencies.

2024-08-08 Thread Robert Bradshaw via dev
Ah, that's good to know. I'll let you finish this up when you have a chance.

On Thu, Aug 8, 2024 at 12:11 PM Danny McCormick
 wrote:
>
> Thanks for calling this out - we actually have a script to do this 
> automatically after the release like you're suggesting. This release, it ran 
> and the resulting PR is currently assigned to me. I have not merged it yet 
> because there are (seemingly real) CI failures which I haven't had a chance 
> to deal with yet. - https://github.com/apache/beam/pull/31885.
>
> Thanks,
> Danny
>
> On Thu, Aug 8, 2024 at 9:08 PM Robert Bradshaw via dev  
> wrote:
>>
>> I noticed when updating a dependency in the base image requirements and 
>> running
>>
>> ./gradlew :sdks:python:container:generatePythonRequirementsAll
>>
>> that a *lot* of other dependencies changed as well. Most were minor
>> version bumps, but this seems less than ideal. I wonder if we should
>> run this script periodically (maybe just after a release to let things
>> bake for the next one?) rather than put it on whoever is updating a
>> dependency to manually update everything else.


Updating beam container dependencies.

2024-08-08 Thread Robert Bradshaw via dev
I noticed when updating a dependency in the base image requirements and running

./gradlew :sdks:python:container:generatePythonRequirementsAll

that a *lot* of other dependencies changed as well. Most were minor
version bumps, but this seems less than ideal. I wonder if we should
run this script periodically (maybe just after a release to let things
bake for the next one?) rather than put it on whoever is updating a
dependency to manually update everything else.


Re: [VOTE] Release 2.58.0, release candidate #2

2024-08-06 Thread Robert Bradshaw via dev
+1 (binding)

Validated that the artifacts and signatures are good. Tested a couple of
simple pipelines in a fresh install.

On Mon, Aug 5, 2024 at 3:19 PM Valentyn Tymofieiev via dev <
dev@beam.apache.org> wrote:

> +1. Verified that a cherry-pick I made actually made the difference in the
> new release.
>
> On Mon, Aug 5, 2024 at 2:56 PM Robert Burke  wrote:
>
>> +1 (Binding)
>>
>> Once again validated the linux-amd prism binary against the java and
>> python validates runner tests.
>>
>> Asie: it is nice to see that the state at HEAD has moved forward from the
>> cut! Next release! :3
>>
>> On Mon, Aug 5, 2024, 7:40 AM Jack McCluskey via dev 
>> wrote:
>>
>>> Hey everyone,
>>>
>>> We need two more binding votes to approve this RC.
>>>
>>> Thanks,
>>>
>>> Jack McCluskey
>>>
>>> On Thu, Aug 1, 2024 at 2:30 PM Yi Hu via dev 
>>> wrote:
>>>
 +1 (non-binding)

 Tested DataflowTemplates (including YamlTemplate, excluding Python UDF)
 integration suites.

 On Thu, Aug 1, 2024 at 3:15 AM Jan Lukavský  wrote:

> +1 (binding)
>
> Tested FlinkRunner with Java SDK.
>
>  Jan
> On 7/30/24 21:12, Jack McCluskey via dev wrote:
>
> Hi everyone,
>
> Please review and vote on the release candidate #2 for the version
> 2.58.0, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> Reviewers are encouraged to test their own use cases with the release
> candidate, and vote +1 if no issues are found. Only PMC member votes will
> count towards the final vote, but votes from all community members is
> encouraged and helpful for finding regressions; you can either test your
> own use cases [13] or use cases from the validation sheet [10].
>
> The complete staging area is available for your review, which includes:
> * GitHub Release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2], which is signed with the key with fingerprint
> DF3CBA4F3F4199F4 (D20316F712213422 if automated) [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.58.0-RC2" [5],
> * website pull request listing the release [6], the blog post [6], and
> publishing the API reference manual [7].
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2] and PyPI[8].
> * Go artifacts and documentation are available at pkg.go.dev [9]
> * Validation sheet with a tab for 2.58.0 release to help with
> validation [10].
> * Docker images published to Docker Hub [11].
> * PR to run tests against release branch [12].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> For guidelines on how to try the release in your projects, check out
> our RC testing guide [13].
>
> Thanks,
>
> Jack McCluskey
>
> [1] https://github.com/apache/beam/milestone/22
> [2] https://dist.apache.org/repos/dist/dev/beam/2.58.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1383/
> [5] https://github.com/apache/beam/tree/v2.58.0-RC2
> [6] https://github.com/apache/beam/pull/31925
> [7] https://github.com/apache/beam-site/pull/668
> [8] https://pypi.org/project/apache-beam/2.58.0rc2/
> [9]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.58.0-RC2/go/pkg/beam
> [10]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1578020829#gid=1578020829
> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
> [12] https://github.com/apache/beam/pull/31832
> [13]
> https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md
>
> --
>
>
> Jack McCluskey
> SWE - DataPLS PLAT/ Dataflow ML
> RDU
> jrmcclus...@google.com
>
>
>


Re: Beam Example Bugs

2024-07-03 Thread Robert Bradshaw via dev
We should be validating as much of our documentation as possible in
our testing, e.g.
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/snippets
is where the actual code and examples on our website (for Python)
lives to ensure it is regularly tested (on each PR) doesn't bitrot.
But if there are things that are not being validated we should try to
add that.

On Wed, Jul 3, 2024 at 11:37 AM Valentyn Tymofieiev via dev
 wrote:
>
> Thanks for flagging this Joey, I reopened 
> https://github.com/apache/beam/issues/31624.
>
> We can certainly validate any aspect of Beam during the release. I think it 
> should be possible to detect issues like this in a website/playground test 
> suite, and include that suite in the list of postcommit suites we watch 
> during the release. Then it will be periodically looked at, even if these 
> errors might not be release-blocking because playground is not a part of 
> released artifacts.
>
>
> On Wed, Jul 3, 2024 at 10:41 AM Joey Tran  wrote:
>>
>> There are a handful of bugs with the interactive examples on the Beam docs 
>> page
>>
>> https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/#example-3-flatmap-without-a-function
>> https://beam.apache.org/documentation/transforms/python/elementwise/map/
>> (example 8)
>>
>> I wonder if it might be worthwhile to add website/playground validation to 
>> the Release Validation process? Not sure how onerous this would be. A new 
>> team I introduced to Beam noted that the unreliable examples was a pain 
>> point for them.
>>
>> Cheers,
>> Joey


Re: [Python SDK] Allowing int values for floats when checking types

2024-06-28 Thread Robert Bradshaw via dev
Yeah, that makes a lot of sense to me. (I suppose we could promote
both of them to complexes as well.)

On Fri, Jun 28, 2024 at 8:36 AM Joey Tran  wrote:
>
> Hey all,
>
> We're running into a fairly common situation where we get a TypeCheckError 
> when we supply an int value where a float parameter is expected.
>
> It's easy enough to work around this on a case-by-case basis by just casting 
> ints to floats, but this is an easy thing to forget to do, which means it's 
> likely to result in bugs.
>
> The function that does the type checking `is_consistent_with`[1] cites that 
> it's based on definitions from PEP 484. PEP 484 states: [2]
>
> > Rather than requiring that users write import numbers and then use 
> > numbers.Float etc., this PEP
> > proposes a straightforward shortcut that is almost as effective: when an 
> > argument is annotated
> > as having type float, an argument of type int is acceptable;
>
> Based on this, would it be reasonable to special-case type checks when 
> checking int against the float type?
>
> Best,
> Joey
>
>
> [1] 
> https://github.com/apache/beam/blob/3588d195335fa3dc06b002e5e468baa27e79f8fa/sdks/python/apache_beam/typehints/typehints.py#L1296
> [2] https://peps.python.org/pep-0484/#the-numeric-tower


Re: [Discussion] Strategy on drop Java 8 support

2024-06-24 Thread Robert Bradshaw via dev
On Mon, Jun 24, 2024 at 9:59 AM Kenneth Knowles  wrote:
>
> Step 1 and 2 sound great. (I tried a first step with 
> https://github.com/apache/beam/pull/29992 but didn't have bandwidth)
>
> This will make it easier for people to get started with beam without having 
> to deal with ancient version compatibility and installing old Java, etc. Even 
> as a minor point, many of our build plugins are out of date because they have 
> moved on to more modern Java versions.

+1

> Questions:
>
>  - Could Beam move to requiring latest Java to build and just relying on 
> "--release" flag or "--target" flag to build the artifacts we release?

I probably wouldn't track the very latest, but whatever is generally
available on relatively modern systems (and +1 to just using --target
for artifacts).

> (we need to be sure we don't rely on pieces of the standard library that are 
> missing in JRE8)

This would need to be rigorously tested. How to ensure sufficient
coverage? (This is where compiling against old SDKs may have an
advantage. Or would using --source and --target be sufficient? Or will
we still run into issues with our dependencies dropping support for
old versions anyway?)

>  - Can we release multi-release jars to provide updated versions for 
> different JDK versions?

As long as we choose to maintain backwards compatibility with version
X, is there any advantage to providing separate artifacts for version
Y>X? Seems like it would mostly just complicate things.

I also think a bigger open question is how long we will maintain
support for Java 8 (11, etc.) Is there a good measure for when the
ecosystem (or, more targeted, our users) have moved on?

> Kenn
>
> On Mon, Jun 24, 2024 at 9:44 AM Yi Hu via dev  wrote:
>>
>> Dear Beam developers,
>>
>> As Java8 has gone through end-of-public-update, many Beam dependencies have 
>> already deprecated Java8 (see [1]), and dropping Java8 supports are planned.
>>
>> Beam hasn't deprecated Java8, moreover, currently Beam CI is using Java8 to 
>> test and release Beam. Referring to other Apache projects, I hereby suggest 
>> a 3-step process for dropping Java 8 support:
>>
>> 1. Switch CI to Java11, while retain the byte code compatibility to Java8 
>> for Beam releases. Tracked in  [1].
>>
>> This won't affect Beam developers and users currently on Java8, and can 
>> be done immediately
>>
>> 2. Require Java11+ to build Beam, deprecate Java8 support [2].
>>
>> This still won't affect Beam users currently on Java8, but for Beam 
>> developers build custom Beam artifacts, they will need Java11+
>>
>> 3. Drop Java8 support
>>
>> This will affect Beam users. Targeted in a medium-long future date, when 
>> Beam's major dependencies already dropped Java8 support
>>
>> There are a few details for further decision
>>
>> * Java8 has ended premier support in March 2022, the next LTS, Java11 also 
>> ended premier support in Sept 2023. Should we bump the default Java version 
>> to 17 for CI at once (while keeping Java8/11 bytecode compatibility for the 
>> build)?
>>
>> * Timeline of deprecating Java8.
>>   I am volunteering to work on [1] which is targeted to Beam 2.58.0; 
>> naturally (2) would be Beam 2.59.0 or 2.60.0.
>>
>> Please provide your thoughts on the general process, and highlight
>> particular areas of concern.
>>
>> [1] https://github.com/apache/beam/issues/31677
>>
>> [2] https://www.oracle.com/java/technologies/java-se-support-roadmap.html
>>
>> Regards,
>> Yi
>>
>> --
>>
>> Yi Hu, (he/him/his)
>>
>> Software Engineer
>>
>>


Re: [VOTE] Release 2.57.0, release candidate #1

2024-06-24 Thread Robert Bradshaw via dev
+1 (binding)

The signatures and artifacts all look good. Also tested out some
pipelines with the Python SDK installed into a fresh virtual
environment.

On Mon, Jun 24, 2024 at 2:20 AM Jan Lukavský  wrote:
>
> +1 (binding)
>
> Tested Java SDK with Flink Runner 1.18.
>
>   Jan
>
> On 6/22/24 06:43, Jean-Baptiste Onofré wrote:
> > +1 (binding)
> >
> > Regards
> > JB
> >
> > On Fri, Jun 21, 2024 at 10:17 PM Kenneth Knowles  wrote:
> >> Hi everyone,
> >>
> >> Please review and vote on the release candidate #1 for the version 2.57.0, 
> >> as follows:
> >>
> >> [ ] +1, Approve the release
> >> [ ] -1, Do not approve the release (please provide specific comments)
> >>
> >> Reviewers are encouraged to test their own use cases with the release 
> >> candidate, and vote +1 if no issues are found. Only PMC member votes will 
> >> count towards the final vote, but votes from all community members is 
> >> encouraged and helpful for finding regressions; you can either test your 
> >> own use cases [13] or use cases from the validation sheet [10].
> >>
> >> The complete staging area is available for your review, which includes:
> >>
> >> GitHub Release notes [1],
> >> the official Apache source release to be deployed to dist.apache.org [2], 
> >> which is signed with the key with fingerprint 
> >> 03DBA3E6ABDD04BFD1558DC16ED551A8AE02461C (D20316F712213422 if automated) 
> >> [3],
> >> all artifacts to be deployed to the Maven Central Repository [4],
> >> source code tag "v2.57.0-RC1" [5],
> >> cursed website pull request listing the release [6], the blog post [6], 
> >> and publishing the API reference manual [7].
> >> Python artifacts are deployed along with the source release to the 
> >> dist.apache.org [2] and PyPI[8].
> >> Go artifacts and documentation are available at pkg.go.dev [9]
> >> Validation sheet with a tab for 2.57.0 release to help with validation 
> >> [10].
> >> Docker images published to Docker Hub [11].
> >> PR to run tests against release branch [12].
> >>
> >> The vote will be open for at least 72 hours. It is adopted by majority 
> >> approval, with at least 3 PMC affirmative votes.
> >>
> >> For guidelines on how to try the release in your projects, check out our 
> >> RC testing guide [13].
> >>
> >> Thanks,
> >> Kenn
> >>
> >> [1] https://github.com/apache/beam/milestone/21
> >> [2] https://dist.apache.org/repos/dist/dev/beam/2.57.0/
> >> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> >> [4] https://repository.apache.org/content/repositories/orgapachebeam-1379/
> >> [5] https://github.com/apache/beam/tree/v2.57.0-RC1
> >> [6] https://github.com/apache/beam/pull/31667
> >> [7] https://github.com/apache/beam-site/pull/666
> >> [8] https://pypi.org/project/apache-beam/2.57.0rc1/
> >> [9] 
> >> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.57.0-RC1/go/pkg/beam
> >> [10] 
> >> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?gid=612149473#gid=612149473
> >> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
> >> [12] https://github.com/apache/beam/pull/31513
> >> [13] 
> >> https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md


Re: Beam Infrastructure - Highlights of Recent Changes

2024-06-13 Thread Robert Bradshaw via dev
Thanks for the update. Lots of great stuff here!

On Thu, Jun 13, 2024 at 8:27 AM Andrey Devyatkin via dev
 wrote:
>
> Hey Beam community,
>
> We are glad to announce some new changes that our team has been working on 
> for a while to implement new solutions and enhance the existing ones. The 
> main focus was to improve the Beam infrastructure by increasing test 
> coverage, adding a reporting mechanism, and enhancing the level of code 
> analysis:
>
> Load and Stress Tests for streaming cases
> We've laid the foundation for implementing Stress Tests to be used for 
> writing new tests and improving existing ones. Stress Tests were introduced 
> for the following IOs:
>
> BigQueryIO
> BigTableIO
> KafkaIO
> PubSubIO
> SpannerIO
>
> We've also implemented a new Load Test for PubSubIO.
> The intention behind the Stress Tests is to measure how a write operation 
> behaves under load and use the results to define potential SLAs for IOs. As a 
> result, we came up with a document describing all the experiments conducted 
> during the implementation, which helped us identify some bugs related to 
> missing records. The document contains a set of prerequisites, links to the 
> PRs and write jobs.
> For more information about the experiments: 
> https://docs.google.com/document/d/1CVywXz7WwidIMYEp0iAMmQmmMfcExDvkGDK3dOq1bUs/edit?usp=sharing
>
> Training DuetAI for Dataflow
> We continue to enrich the knowledge base of DuetAI so that it knows even more 
> about Apache Beam: starting from basic questions related to documentation and 
> ending with generating code examples on how to use I/O connectors and 
> explaining to the user what a particular piece of code provided by them does. 
> The knowledge base contains 56 prompt/response pairs for documentation 
> lookup, 11 code generation prompts and 11 code explanation prompts, covering 
> various I/O connectors implemented in Java and Python.
> See the knowledge base: 
> https://github.com/apache/beam/tree/master/learning/prompts
> Beam Flaky Test Detection
> We've developed a reporting mechanism to notify about flaky test cases when 
> constant failed runs occur. Previously, there were no clear signals on what 
> tests were consistently flaky. Now, the tool monitors the current statistics 
> and creates a GitHub issue with a link to Grafana attached. You may have 
> noticed the open issues with the name "The  is flaky" in the daily 
> Beam High Priority Issue Report.
> For more information on how the tool works: 
> https://docs.google.com/document/d/13lwRAWoE7XA2ig0TDt98pI_nVBEQQ2UYqeUPJ0rGnME/edit?usp=sharing
> Beam Code Coverage Analysis
> There were some gaps in Python code coverage and no coverage analysis for 
> Java. As a result, we fixed configuration issues for the Jacoco plugin to 
> generate .xml files, which are used to display statistics in the Codecov 
> report, and adjusted the configuration for Python.
> For more details: 
> https://docs.google.com/document/d/1186dvd1t774EydPW0T31ynmwYxjqXUo9bh9nCO17rO0/edit?usp=sharing
> Beam Playground
> We've added a Playground CI Nightly check to make sure that Playground 
> examples remain functioning between SDK changes, etc. This will help ensure 
> that the examples are always up-to-date and that users can successfully use 
> them.
>
>
> Taking this opportunity, I would like to thank our team for these changes:
>
> Vlado Djerek (vlado.dje...@akvelon.com)
> Vitaly Terentyev (vitaly.terent...@akvelon.com)
> Akarys Shorabek (akarys.shora...@akvelon.com)
> Oleg Borisevich (oleg.borisev...@akvelon.com)
> Daria Bezkorovaina (daria.bezkorova...@akvelon.com)
> Danny McCormick (dannymccorm...@google.com)
> Yi Hu (ya...@google.com)
> XQ Hu (x...@google.com)
>
>
>
> Feel free to reach out to any of us if you have any questions.
>
>
> Thanks,
> Andrey


Re: design docs that get deleted, etc

2024-05-29 Thread Robert Bradshaw via dev
On Wed, May 29, 2024 at 1:01 PM Robert Burke  wrote:
>
> Honestly, it's less about how we propose things and more to do with actually 
> making "completing" the documentation work to some permanent place.
>
> That can be as simple as "migrate the final implemented design to a markdown 
> file in the GitHub repo at *foo* or published on the beam website at *bar*.
>
> The big issue is backfilling our existing set of proposals.

I actually don't think there's a big difference between past proposals
and future ones--unless we can make the process really easy (aka
essentially automated) I think the motivation to manually migrate will
be low in the future too (and note that the doc remains a living doc
generally until a short while after its implementation). The "two
sources of truth" doesn't work well until it's static and ready to be
archived, at which point additional work is harder to motivate.

> I do like the google docs for commentary and dispersed discussions, but it's 
> not a long term solution (per the initial message in this thread).

I'm a big fan of markdown in a repo for long-term storage. Anyone know
of any good google docs -> markdown converters? That might be harder
to do if there are images (and of course comment threads, etc. would
be less likely to be preserved).

Even snapshotting them as PDF or HTML might not be that bad, if the
primary purpose is for archiving.


> On Wed, May 29, 2024, 11:56 AM Robert Bradshaw via dev  
> wrote:
>>
>> While I'm not opposed to having a more formal process for proposals to
>> go from idea to consensus to implementation, I'm not sure how much
>> this would solve the primary issues we face (discoverability and
>> durability). But maybe that could be built into the process? At the
>> very least we could have an "index" which would give identifiers (and
>> hopefully good titles) to all the proposals, and maybe have an offline
>> process to snapshot such docs (even just periodically pulling the
>> content to a repo like I do with
>> https://github.com/cython/cython-issues ). I have yet to find a medium
>> (not even wikis) that facilitates conversation/collaborative editing
>> to the extent that docs does, but I agree with the downside that
>> ownership by random individuals can pose a problem.
>>
>> On Wed, May 29, 2024 at 7:07 AM Jan Lukavský  wrote:
>> >
>> > Hi,
>> >
>> > regarding changing the way we document past (and more importantly
>> > future) changes, I've always been a big fan of the FLIP analogy [1]. I
>> > would love if we could make this work for Beam as well, while preserving
>> > the 'informal' part that I believe all of us want to keep. On the other
>> > hand, this could make the design decisions more searchable, transparent
>> > and get more people involved in the process. Having design documents
>> > durable is of course a highly important part of it.
>> >
>> >   Jan
>> >
>> > [1] https://lists.apache.org/thread/whfy3706w2d0q6rdk4kwyrzvhfd4b5kg
>> >
>> > On 5/29/24 15:04, Kenneth Knowles wrote:
>> > > Hi all,
>> > >
>> > > Yesterday someone asked me about the design doc linked from
>> > > https://github.com/apache/beam/issues/18297 because it is now a 404.
>> > >
>> > > There are plenty of reasons a Google Doc might no longer be
>> > > accessible. They exist outside the project's control. This is part of
>> > > why ASF projects emphasize having discussions on the dev@ list and
>> > > often put all their designs directly onto some ASF-hosted
>> > > infrastructure, such as a Wiki (Example:
>> > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals).
>> > > In the early days we had a project-owned shared folder but it fell
>> > > into disuse.
>> > >
>> > > In my opinion, Google Docs are still the best place for design docs to
>> > > get feedback and be revised, but the lack of durability is a downside
>> > > to stay aware of. I've also gotten lots of complaints of lack of
>> > > discoverability and lack of systematization of design docs, neither of
>> > > which would be addressed by a shared folder.
>> > >
>> > > I don't have a proposal or suggestion. I don't think this is super
>> > > urgent, certainly not my personal highest priority, but I thought I'd
>> > > just share this as food for thought.
>> > >
>> > > Kenn


Re: design docs that get deleted, etc

2024-05-29 Thread Robert Bradshaw via dev
While I'm not opposed to having a more formal process for proposals to
go from idea to consensus to implementation, I'm not sure how much
this would solve the primary issues we face (discoverability and
durability). But maybe that could be built into the process? At the
very least we could have an "index" which would give identifiers (and
hopefully good titles) to all the proposals, and maybe have an offline
process to snapshot such docs (even just periodically pulling the
content to a repo like I do with
https://github.com/cython/cython-issues ). I have yet to find a medium
(not even wikis) that facilitates conversation/collaborative editing
to the extent that docs does, but I agree with the downside that
ownership by random individuals can pose a problem.

On Wed, May 29, 2024 at 7:07 AM Jan Lukavský  wrote:
>
> Hi,
>
> regarding changing the way we document past (and more importantly
> future) changes, I've always been a big fan of the FLIP analogy [1]. I
> would love if we could make this work for Beam as well, while preserving
> the 'informal' part that I believe all of us want to keep. On the other
> hand, this could make the design decisions more searchable, transparent
> and get more people involved in the process. Having design documents
> durable is of course a highly important part of it.
>
>   Jan
>
> [1] https://lists.apache.org/thread/whfy3706w2d0q6rdk4kwyrzvhfd4b5kg
>
> On 5/29/24 15:04, Kenneth Knowles wrote:
> > Hi all,
> >
> > Yesterday someone asked me about the design doc linked from
> > https://github.com/apache/beam/issues/18297 because it is now a 404.
> >
> > There are plenty of reasons a Google Doc might no longer be
> > accessible. They exist outside the project's control. This is part of
> > why ASF projects emphasize having discussions on the dev@ list and
> > often put all their designs directly onto some ASF-hosted
> > infrastructure, such as a Wiki (Example:
> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals).
> > In the early days we had a project-owned shared folder but it fell
> > into disuse.
> >
> > In my opinion, Google Docs are still the best place for design docs to
> > get feedback and be revised, but the lack of durability is a downside
> > to stay aware of. I've also gotten lots of complaints of lack of
> > discoverability and lack of systematization of design docs, neither of
> > which would be addressed by a shared folder.
> >
> > I don't have a proposal or suggestion. I don't think this is super
> > urgent, certainly not my personal highest priority, but I thought I'd
> > just share this as food for thought.
> >
> > Kenn


Re: Question: Java Apache Beam, mock external Clients initialized in Setup

2024-05-28 Thread Robert Bradshaw via dev
You could let your @Setup method initialize your serviceAPI if (and
only if) it's null. For your tests, pre-initialize it with a Fake, and
otherwise the real thing will get used.

On Sat, May 25, 2024 at 1:27 PM Ritwik Dutta via dev
 wrote:
>
> Hi Hu,
>
> Thank you,
> I am testing the serviceAPI  call in separate unit tests already. Let me 
> rephrase  (serviceAPI should be renamed to serviceLayer,  it doesn't 
> necessarily have to be a restful API with an http endpoint)
> The question is how, do we create Fakes for items with difficult constructors.
>
> At least provide examples, or more explanation in Apache dcoumentation.
>
> Feel free to reply directly in stackoverflow, so other community members can 
> reply
>
> Thanks!
>
> -Ritwik Dutta
>  734-262-4285
>
> On Saturday, May 25, 2024 at 07:12:23 AM PDT, XQ Hu  wrote:
>
>
> I am not sure which part you want to test. If the processData part should be 
> tested, you could refactor the code without use any Beam specific code and 
> test the processing data logic.
>
> From your example, it seems that you are calling some APIs, we recently added 
> a new Web API IO: https://beam.apache.org/documentation/io/built-in/webapis/, 
> which provides a way to test.
>
> On Wed, May 22, 2024 at 5:06 PM Ritwik Dutta via dev  
> wrote:
>
> any response yet? No one has answers? I left a stackoverflow bounty on the 
> question
>
> Using external methods is pretty important
>
> On Sunday, May 12, 2024 at 11:52:25 AM PDT, Ritwik Dutta  
> wrote:
>
>
> Hi,
> I wrote the following question here.
> It would be really helpful also, if you can also update your documentation on 
> Using Test Fakes in different Situations. It was very light documentation. 
> Please provide more explanation and examples.  
> https://beam.apache.org/documentation/io/testing/#:~:text=non%2DBeam%20client.-,Use%20fakes,-Instead%20of%20using
>
>
> Question: Java Apache Beam, mock external Clients initialized in @Setup 
> method of DoFn with Constructors variables
> https://stackoverflow.com/questions/78468953/java-apache-beam-mock-external-clients-initialized-in-setup-method-of-dofn-wit
>
> Thanks,
>
> -Ritwik Dutta
>  734-262-4285


Re: release process: typescript SDK?

2024-04-16 Thread Robert Bradshaw via dev
Correct, I've just been pushing these manually, and lately there haven't
been many changes to push. I'm all for getting these set up as part of the
standard release process.

On Tue, Apr 16, 2024 at 1:22 PM Danny McCormick 
wrote:

> I've never published npm artifacts before, but I imagine the hardest part
> is getting the credentials set up, then it is probably very easy to set up
> a GitHub Actions workflow to publish
> <https://docs.github.com/en/actions/publishing-packages/publishing-nodejs-packages#publishing-packages-to-the-npm-registry>.
> Who has done these releases in the past/has credentials for the npm
> package? Maybe @Robert Bradshaw ? We will need a
> token set up as a secret to automate this.
>
> I'll also note that we don't do any typescript validation today, and it
> would be nice to publish RCs as part of this
>
> On Tue, Apr 16, 2024 at 4:11 PM Austin Bennett  wrote:
>
>> Hi Beam Devs,
>>
>> Calling out it looks like our release process for apache-beam for
>> typescript/npm is broken, seemingly the last published release was 2.49.0
>> about 9 months ago.  The other languages look like they are publishing to
>> expected locations.
>>
>> https://www.npmjs.com/package/apache-beam
>>
>> I noticed this since I was digging into security concerns raised by
>> GitHub's dependabot across our repos [ ex:
>> https://github.com/apache/beam-starter-typescript/security/dependabot ], and
>> towards getting our repos tidied.
>>
>> This leads me to believe we may want two distinct things:
>> * update our release docs/process/scripts to ensure that we
>> generate/publish all artifacts to relevant repositories.
>> * Arrive at a process to more straightforwardly attend to security
>> updates [ maybe we want these sent to dev list, or another distribution? ]
>>
>> From a very quick search, it did not look like we have scripts to push to
>> npm.  That should be verified more thoroughly -- i haven't done a release
>> before, so relevant scripts could be hiding elsewhere.
>>
>> Cheers,
>> Austin
>>
>>
>> NOTE:  everything with our main Beam repo specifically looks OK.  Some
>> things discovered were on the other/supplementary repos, though I believe
>> those are still worthwhile to attend to and support.
>>
>


Re: [VOTE] Patch Release 2.55.1, release candidate #2

2024-04-03 Thread Robert Bradshaw via dev
+1 (binding) The artifacts all look good to me.

On Wed, Apr 3, 2024 at 1:35 PM XQ Hu via dev  wrote:

> +1 (non-binding). Tested this using a simple Dataflow ML pipeline:
> https://github.com/google/dataflow-ml-starter/actions/runs/8541848483.
>
> On Wed, Apr 3, 2024 at 2:35 PM Jeff Kinard  wrote:
>
>> +1. Validated running from local gradle JAR and staged maven JAR for
>> expansion-service.
>>
>> On Wed, Apr 3, 2024 at 11:08 AM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi everyone,
>>>
>>> I put together a patch release per the conversation in
>>> https://lists.apache.org/thread/kvq1wsj505pvopkq186dnvc0l6ryyfh0.
>>>
>>> Please review and vote on the release candidate #2 (I messed up rc1) for
>>> the version 2.55.1, as follows:
>>> [ ] +1, Approve the release
>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>
>>>
>>> Reviewers are encouraged to test their own use cases with the release
>>> candidate, and vote +1 if no issues are found. Only PMC member votes will
>>> count towards the final vote, but votes from all community members is
>>> encouraged and helpful for finding regressions; you can either test your
>>> own use cases [9] or use cases from the validation sheet [7].
>>>
>>> The complete staging area is available for your review, which includes:
>>> * the official Apache source release to be deployed to dist.apache.org
>>> [1], which is signed with the key with fingerprint D20316F712213422 [2],
>>> * all artifacts to be deployed to the Maven Central Repository [3],
>>> * source code tag "v2.55.1-RC2" [4],
>>> * Python artifacts are deployed along with the source release to the
>>> dist.apache.org [1] and PyPI[5].
>>> * Go artifacts and documentation are available at pkg.go.dev [6]
>>> * Validation sheet with a tab for 2.55.1 release to help with validation
>>> [7].
>>> * Docker images published to Docker Hub [8].
>>>
>>> This release does not include any website changes since it is addressing
>>> a single bug fix as discussed in
>>> https://lists.apache.org/thread/kvq1wsj505pvopkq186dnvc0l6ryyfh0.
>>>
>>> The vote will be open for at least 72 hours. It is adopted by majority
>>> approval, with at least 3 PMC affirmative votes.
>>>
>>> For guidelines on how to try the release in your projects, check out our
>>> RC testing guide [9].
>>>
>>> Thanks,
>>> Danny
>>>
>>> [1] https://dist.apache.org/repos/dist/dev/beam/2.55.1/
>>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>> [3]
>>> https://repository.apache.org/content/repositories/orgapachebeam-1375/
>>> [4] https://github.com/apache/beam/tree/v2.55.1-RC2
>>> [5] https://pypi.org/project/apache-beam/2.55.1rc2/
>>> [6]
>>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.55.1-RC2/go/pkg/beam
>>> [7]
>>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=686075626
>>> [8] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>>> [9]
>>> https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md
>>>
>>


Re: Supporting Dynamic Destinations in a portable context

2024-04-03 Thread Robert Bradshaw via dev
On Wed, Apr 3, 2024 at 4:15 AM Kenneth Knowles  wrote:

> Let me summarize the most recent proposal on-list to frame my question
> about this last suggestion. It looks like this:
>
> 1. user has an element, call it `data`
> 2. user maps `data` to an arbitrary metadata row, call it `dest`
> 3. we can do things like shuffle on `dest` because it isn't too big
> 4. we map `dest` to a concrete destination (aka URL) to write to by a
> string format that uses fields of `dest`
>
> I believe steps 1-3 are identical is expressivity to non-portable
> DynamicDestinations. So Reuven the question is for step 4: what are the
> mappings from `dest` to URL that cannot be expressed by string formatting
> but need SQL or Lua, etc? That would be a useful guide to consideration of
> those possibilities.
>

I think any non-trivial mapping can be done in step 2. It may be possible
to come up with a case where something other than string substitution is
needed to be done to make dest small enough to shuffle, but I think that'd
be a really rare corner case, and then it's just an optimization rather
than feature completeness question.


> FWIW I think even if we add a mini-language that string formatting has
> better ease of use (can easily be displayed in UI, etc) so it would be the
> first choice, and more advanced stuff is a fallback for rare cases. So they
> are both valuable and I'd be happy to implement the easier-to-use path
> right away while we discuss.
>

+1. Note that this even lets us share the config "path/table/..." field
that is a static string for non-dynamic destinations.

In light of the above, let's avoid a complex mini-language. I'd start with
nothing but plugging things in w/o any formatting options.


> On Tue, Apr 2, 2024 at 2:59 PM Reuven Lax via dev 
> wrote:
>
>> I do suspect that over time we'll find more and more cases we can't
>> express, and will be asked to extend this little templating in more
>> directions. To head that off - could we easily just reuse an existing
>> language (SQL, LUA, something of the form?) instead of creating something
>> new?
>>
>> On Tue, Apr 2, 2024 at 8:55 AM Kenneth Knowles  wrote:
>>
>>> I really like this proposal. I think it has narrowed down and solved the
>>> essential problem of not shuffling excess redundant data, and also provides
>>> the vast majority of the functionality that a lambda would, with
>>> significantly better debugability and usability too, since the dynamic
>>> destination pattern string can be in display data, etc.
>>>
>>> Kenn
>>>
>>> On Wed, Mar 27, 2024 at 1:58 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> On Wed, Mar 27, 2024 at 10:20 AM Reuven Lax  wrote:
>>>>
>>>>> Can the prefix still be generated programmatically at graph creation
>>>>> time?
>>>>>
>>>>
>>>> Yes. It's just a property of the transform passed by the user at
>>>> configuration time.
>>>>
>>>>
>>>>> On Wed, Mar 27, 2024 at 9:40 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> This does seem like the best compromise, though I think there will
>>>>>>> still end up being performance issues. A common pattern I've seen is 
>>>>>>> that
>>>>>>> there is a long common prefix to the dynamic destination followed the
>>>>>>> dynamic component. e.g. the destination might be
>>>>>>> long/common/path/to/destination/files/. In this case, the
>>>>>>> prefix is often much larger than messages themselves and is what gets
>>>>>>> effectively encoded in the lambda.
>>>>>>>
>>>>>>
>>>>>> The idea here is that the destination would be given as a format
>>>>>> string, say, "long/common/path/to/destination/files/{dest_info.user}".
>>>>>> Another way to put this is that we support (only) "lambdas" that are
>>>>>> represented as string substitutions. (The fact that dest_info does not 
>>>>>> have
>>>>>> to be part of the record, and can be the output of an arbitrary map if 
>>>>>> need
>>>>>> be, makes this restriction not so bad.)
>>>>>>
>>>>>> As well as solving the performance issues, I think this is actually a
>>>>

Re: Design proposal for Beam YAML templates

2024-04-02 Thread Robert Bradshaw via dev
It looks like we're converging on an optional jinja preprocessing phase to
handle this. I'm in favor of this solution.

On Wed, Mar 20, 2024 at 9:23 AM Robert Bradshaw  wrote:

> Thanks. I think this will be a very powerful feature. Left some comments
> on the doc.
>
> On Tue, Mar 19, 2024 at 11:53 AM Jeff Kinard  wrote:
>
>> Hi all,
>>
>> I have another quick design doc discussing the syntax for Beam YAML
>> templates. This feature would allow a user to create a template pipelines
>> similar to what is done in
>> https://github.com/GoogleCloudPlatform/DataflowTemplates
>> that are configurable and therefore more reusable than static YAML
>> pipelines.
>>
>> Design doc:
>> https://docs.google.com/document/d/1AUt4NEoQCBrOEVhadVozyTU2-CqC-pw7kuK6rzIA6Jc/edit?resourcekey=0-d-aVmsKZ89SzSiVwt8n5pw
>>
>> Please take a look and feel free to leave any comments.
>>
>> Thanks,
>> - Jeff
>>
>


Re: Patch release proposal

2024-03-27 Thread Robert Bradshaw via dev
Given the severity of the breakage, and the simplicity of the workaround,
I'm in favor of a patch release. I think we could do Python-only, which
would make the process even more lightweight.

On Wed, Mar 27, 2024 at 3:48 PM Jeff Kinard  wrote:

> Hi all,
>
> Beam 2.55 was released with a bug that causes WriteToJson on Beam YAML to
> fail when using the Java variant. This also affects any user attempting to
> use the Xlang JsonWriteTransformProvider -
> https://github.com/apache/beam/blob/master/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
>
> This is due to a change to
> https://github.com/apache/beam/blob/master/sdks/java/io/json/build.gradle
> that removed
> a dependency on everit which also removed it from being packaged into the
> expansion service JAR:
> beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar
>
> There is a temporary fix to disable the provider in Beam YAML:
> https://github.com/apache/beam/pull/30777
>
> I think with the total loss of function, and a trivial fix, it is worth
> creating a patch release of Beam 2.55 to include this fix.
>
> - Jeff
>
>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Robert Bradshaw via dev
On Wed, Mar 27, 2024 at 10:20 AM Reuven Lax  wrote:

> Can the prefix still be generated programmatically at graph creation time?
>

Yes. It's just a property of the transform passed by the user at
configuration time.


> On Wed, Mar 27, 2024 at 9:40 AM Robert Bradshaw 
> wrote:
>
>> On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:
>>
>>> This does seem like the best compromise, though I think there will still
>>> end up being performance issues. A common pattern I've seen is that there
>>> is a long common prefix to the dynamic destination followed the dynamic
>>> component. e.g. the destination might be
>>> long/common/path/to/destination/files/. In this case, the
>>> prefix is often much larger than messages themselves and is what gets
>>> effectively encoded in the lambda.
>>>
>>
>> The idea here is that the destination would be given as a format string,
>> say, "long/common/path/to/destination/files/{dest_info.user}". Another way
>> to put this is that we support (only) "lambdas" that are represented as
>> string substitutions. (The fact that dest_info does not have to be part of
>> the record, and can be the output of an arbitrary map if need be, makes
>> this restriction not so bad.)
>>
>> As well as solving the performance issues, I think this is actually a
>> pretty convenient and natural way for the user to name their destination
>> (for the common usecase, even easier than providing a lambda), and has the
>> benefit of being much more transparent than an arbitrary callable as well
>> for introspection (for both machine and human that may look at the
>> resulting pipeline).
>>
>>
>>> I'm not entirely sure how to address this in a portable context. We
>>> might simply have to accept the extra overhead when going cross language.
>>>
>>> Reuven
>>>
>>> On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Thanks for putting this together, it will be a really useful feature to
>>>> have.
>>>>
>>>> I am in favor of the string-pattern approaches. I think we need to
>>>> support both the {record=..., dest_info=...} and the elide-fields
>>>> approaches, as the former is nicer when one has a fixed representation for
>>>> the output record (e.g. a proto or avro schema) and the flattened form for
>>>> ease of use in more free-form contexts (e.g. when producing records from
>>>> YAML and SQL).
>>>>
>>>> Also left some comments on the doc.
>>>>
>>>>
>>>> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> There have been some conversations lately about how best to enable
>>>>> dynamic destinations in a portable context. Usually, this comes up for
>>>>> cross-language transforms and more recently for Beam YAML.
>>>>>
>>>>> I've started a short doc outlining some routes we could take. The
>>>>> purpose is to establish a good standard for supporting dynamic 
>>>>> destinations
>>>>> with portability, one that can be applied to most use cases and IOs. 
>>>>> Please
>>>>> take a look and add any thoughts!
>>>>>
>>>>> https://s.apache.org/portable-dynamic-destinations
>>>>>
>>>>> Best,
>>>>> Ahmed
>>>>>
>>>>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Robert Bradshaw via dev
On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:

> This does seem like the best compromise, though I think there will still
> end up being performance issues. A common pattern I've seen is that there
> is a long common prefix to the dynamic destination followed the dynamic
> component. e.g. the destination might be
> long/common/path/to/destination/files/. In this case, the
> prefix is often much larger than messages themselves and is what gets
> effectively encoded in the lambda.
>

The idea here is that the destination would be given as a format string,
say, "long/common/path/to/destination/files/{dest_info.user}". Another way
to put this is that we support (only) "lambdas" that are represented as
string substitutions. (The fact that dest_info does not have to be part of
the record, and can be the output of an arbitrary map if need be, makes
this restriction not so bad.)

As well as solving the performance issues, I think this is actually a
pretty convenient and natural way for the user to name their destination
(for the common usecase, even easier than providing a lambda), and has the
benefit of being much more transparent than an arbitrary callable as well
for introspection (for both machine and human that may look at the
resulting pipeline).


> I'm not entirely sure how to address this in a portable context. We might
> simply have to accept the extra overhead when going cross language.
>
> Reuven
>
> On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Thanks for putting this together, it will be a really useful feature to
>> have.
>>
>> I am in favor of the string-pattern approaches. I think we need to
>> support both the {record=..., dest_info=...} and the elide-fields
>> approaches, as the former is nicer when one has a fixed representation for
>> the output record (e.g. a proto or avro schema) and the flattened form for
>> ease of use in more free-form contexts (e.g. when producing records from
>> YAML and SQL).
>>
>> Also left some comments on the doc.
>>
>>
>> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hey all,
>>>
>>> There have been some conversations lately about how best to enable
>>> dynamic destinations in a portable context. Usually, this comes up for
>>> cross-language transforms and more recently for Beam YAML.
>>>
>>> I've started a short doc outlining some routes we could take. The
>>> purpose is to establish a good standard for supporting dynamic destinations
>>> with portability, one that can be applied to most use cases and IOs. Please
>>> take a look and add any thoughts!
>>>
>>> https://s.apache.org/portable-dynamic-destinations
>>>
>>> Best,
>>> Ahmed
>>>
>>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Robert Bradshaw via dev
Thanks for putting this together, it will be a really useful feature to
have.

I am in favor of the string-pattern approaches. I think we need to support
both the {record=..., dest_info=...} and the elide-fields approaches, as
the former is nicer when one has a fixed representation for the
output record (e.g. a proto or avro schema) and the flattened form for ease
of use in more free-form contexts (e.g. when producing records from YAML
and SQL).

Also left some comments on the doc.


On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev 
wrote:

> Hey all,
>
> There have been some conversations lately about how best to enable dynamic
> destinations in a portable context. Usually, this comes up for
> cross-language transforms and more recently for Beam YAML.
>
> I've started a short doc outlining some routes we could take. The purpose
> is to establish a good standard for supporting dynamic destinations with
> portability, one that can be applied to most use cases and IOs. Please take
> a look and add any thoughts!
>
> https://s.apache.org/portable-dynamic-destinations
>
> Best,
> Ahmed
>


Re: Python API: FlatMap default -> lambda x:x?

2024-03-21 Thread Robert Bradshaw via dev
I would be more comfortable with a default for FlatMap than overloading
Flatten in this way. Distinguishing between

(pcoll,) | beam.Flatten()

and

(pcoll) | beam.Flatten()

seems a bit error prone.


On Thu, Mar 21, 2024 at 2:23 PM Joey Tran  wrote:

> Ah, I misunderstood your original suggestion then. That makes sense then.
> I have already seen someone get a little confused about the names and
> surprised that Flatten doesn't do what FlatMap does.
>
> On Thu, Mar 21, 2024 at 5:20 PM Valentyn Tymofieiev 
> wrote:
>
>> Beam throws an error at submission time in Python if you pass a single
>> PCollection  to Flatten. The scenario you describe concerns a one-element
>> list.
>>
>> On Thu, Mar 21, 2024, 13:43 Joey Tran  wrote:
>>
>>> I think it'd be quite surprising if beam.Flatten would become equivalent
>>> to FlatMap if passed only a single pcollection. One use case that would be
>>> broken from that is cases where someone might be flattening a variable
>>> number of pcollections, including possibly only one pcollection. In that
>>> case, that single pcollection suddenly get FlatMapped.
>>>
>>>
>>>
>>> On Thu, Mar 21, 2024 at 4:36 PM Valentyn Tymofieiev via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> One possible alternative is to define beam.Flatten for a single
>>>> collection to be functionally equivalent to beam.FlatMap(lambda x: x), but
>>>> that would be a larger change and such behavior might need to be
>>>> consistent across SDKs and documented. Adding a default value is a simpler
>>>> change.
>>>>
>>>> I can also confirm that the usage
>>>>
>>>> |  'Flatten' >> beam.FlatMap(lambda x: x)
>>>>
>>>> is fairly common by inspecting uses of Beam internally.
>>>> On Thu, Mar 21, 2024 at 1:30 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> IIRC, Java has Flatten.iterables() and Flatten.collections(), the
>>>>> first of which does what you want.
>>>>>
>>>>> Giving FlatMap a default arg of lambda x: x is an interesting idea.
>>>>> The only downside I see is a less clear error if one forgets to provide
>>>>> this (now mandatory) parameter, but maybe that's low enough to be worth 
>>>>> the
>>>>> convenience?
>>>>>
>>>>> On Thu, Mar 21, 2024 at 12:02 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> That's not really the same thing, is it? `beam.Flatten` combines two
>>>>>> or more pcollections into a single pcollection while beam.FlatMap unpacks
>>>>>> iterables of elements (i.e. PCollection> -> PCollection)
>>>>>>
>>>>>> On Thu, Mar 21, 2024 at 2:57 PM Valentyn Tymofieiev via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hi, you can use beam.Flatten() instead.
>>>>>>>
>>>>>>> On Thu, Mar 21, 2024 at 10:55 AM Joey Tran <
>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>
>>>>>>>> Hey all,
>>>>>>>>
>>>>>>>> Using an identity function for FlatMap comes up more often than
>>>>>>>> using FlatMap without an identity function. Would it make sense to use 
>>>>>>>> the
>>>>>>>> identity function as a default?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>


Re: Python API: FlatMap default -> lambda x:x?

2024-03-21 Thread Robert Bradshaw via dev
IIRC, Java has Flatten.iterables() and Flatten.collections(), the first of
which does what you want.

Giving FlatMap a default arg of lambda x: x is an interesting idea. The
only downside I see is a less clear error if one forgets to provide this
(now mandatory) parameter, but maybe that's low enough to be worth the
convenience?

On Thu, Mar 21, 2024 at 12:02 PM Joey Tran 
wrote:

> That's not really the same thing, is it? `beam.Flatten` combines two or
> more pcollections into a single pcollection while beam.FlatMap unpacks
> iterables of elements (i.e. PCollection> -> PCollection)
>
> On Thu, Mar 21, 2024 at 2:57 PM Valentyn Tymofieiev via dev <
> dev@beam.apache.org> wrote:
>
>> Hi, you can use beam.Flatten() instead.
>>
>> On Thu, Mar 21, 2024 at 10:55 AM Joey Tran 
>> wrote:
>>
>>> Hey all,
>>>
>>> Using an identity function for FlatMap comes up more often than using
>>> FlatMap without an identity function. Would it make sense to use the
>>> identity function as a default?
>>>
>>>
>>>
>>>


Re: Design proposal for Beam YAML templates

2024-03-20 Thread Robert Bradshaw via dev
Thanks. I think this will be a very powerful feature. Left some comments on
the doc.

On Tue, Mar 19, 2024 at 11:53 AM Jeff Kinard  wrote:

> Hi all,
>
> I have another quick design doc discussing the syntax for Beam YAML
> templates. This feature would allow a user to create a template pipelines
> similar to what is done in
> https://github.com/GoogleCloudPlatform/DataflowTemplates
> that are configurable and therefore more reusable than static YAML
> pipelines.
>
> Design doc:
> https://docs.google.com/document/d/1AUt4NEoQCBrOEVhadVozyTU2-CqC-pw7kuK6rzIA6Jc/edit?resourcekey=0-d-aVmsKZ89SzSiVwt8n5pw
>
> Please take a look and feel free to leave any comments.
>
> Thanks,
> - Jeff
>


Re: [GSoC] Build out Beam Yaml features

2024-03-07 Thread Robert Bradshaw via dev
This looks like an interesting proposal. Thanks! I left some comments
on the doc.

On Tue, Mar 5, 2024 at 8:39 AM Reeba Qureshi  wrote:
>
> Hi all!
>
> I am Reeba Qureshi, interested in the "Build out Beam Yaml features"  (link) 
> for GSoC 2024. I worked with Apache Beam during GSoC 2023 and built Beam ML 
> use cases (report). It was a great experience and I'm looking forward to 
> contributing more to the Beam project!
>
> Here is the initial draft of my proposal (link). I would appreciate any 
> feedback or suggestions on it!
>
> Thanks,
> Reeba Qureshi


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:
>
> On 2/27/24 19:22, Robert Bradshaw via dev wrote:
> > On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
> >> Pulling out focus points:
> >>
> >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>  wrote:
> >>> I can't act on something yet [...] but I expect to be able to [...] at 
> >>> some time in the processing-time future.
> >> I like this as a clear and internally-consistent feature description. It 
> >> describes ProcessContinuation and those timers which serve the same 
> >> purpose as ProcessContinuation.
> >>
> >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>  wrote:
> >>> I can't think of a batch or streaming scenario where it would be correct 
> >>> to not wait at least that long
> >> The main reason we created timers: to take action in the absence of data. 
> >> The archetypal use case for processing time timers was/is "flush data from 
> >> state if it has been sitting there too long". For this use case, the right 
> >> behavior for batch is to skip the timer. It is actually basically 
> >> incorrect to wait.
> > Good point calling out the distinction between "I need to wait in case
> > there's more data." and "I need to wait for something external." We
> > can't currently distinguish between the two, but a batch runner can
> > say something definitive about the first. Feels like we need a new
> > primitive (or at least new signaling information on our existing
> > primitive).
> Runners signal end of data to a DoFn via (input) watermark. Is there a
> need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
restate the streaming processing time semantics in the limited 
>> batch case.
>>
>>
>> Kenn
>>
>> On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:
>>>
>>> I think that before we introduce a possibly somewhat duplicate new feature 
>>> we should be certain that it is really semantically different. I'll 
>>> rephrase the two cases:
>>>
>>>  a) need to wait and block data (delay) - the use case is the motivating 
>>> example of Throttle transform
>>>
>>>  b) act without data, not block
>>>
>>> Provided we align processing time with local machine clock (or better, 
>>> because of testing, make current processing time available via context to 
>>> @ProcessElement) it seems to possble to unify both cases under slightly 
>>> updated semantics of processing time timer in batch:
>>>
>>>  a) processing time timers fire with best-effort, i.e. trying to minimize 
>>> delay between firing timestamp and timer's timestamp
>>>  b) timer is valid only in the context of current key-window, once 
>>> watermark passes window GC time for the particular window that created the 
>>> timer, it is ignored
>>>  c) if timer has output timestamp, this timestamp holds watermark (but this 
>>> is currently probably noop, because runners currently do no propagate 
>>> (per-key) watermark in batch, I assume)
>>>
>>> In case b) there might be needed to distinguish cases when timer has output 
>>> timestamp, if so, it probably should be taken into account.
>>>
>>> Now, such semantics should be quite aligned with what we do in streaming 
>>> case and what users generally expect. The blocking part can be implemented 
>>> in @ProcessElement using buffer & timer, once there is need to wait, it can 
>>> be implemented in user code using plain sleep(). That is due to the 
>>> alignment between local time and definition of processing time. If we had 
>>> some reason to be able to run faster-than-wall-clock (as I'm still not in 
>>> favor of that), we could do that using ProcessContext.sleep(). Delaying 
>>> processing in the @ProcessElement should result in backpressuring and 
>>> backpropagation of this backpressure from the Throttle transform to the 
>>> sources as mentioned (of course this is only for the streaming case).
>>>
>>> Is there anything missing in such definition that would still require 
>>> splitting the timers into two distinct features?
>>>
>>>  Jan
>>>
>>> On 2/26/24 21:22, Kenneth Knowles wrote:
>>>
>>> Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.
>>>
>>> OutputTime is always an event time timestamp so it isn't even allowed to be 
>>> set outside the window (or you'd end up with an element assigned to a 
>>> window that it isn't within, since OutputTime essentially represents 
>>> reserving the right to output an element with that timestamp)
>>>
>>> Kenn
>>>
>>> On Mon, Feb 26, 2024 at 3:19 PM Robert Burke  wrote:
>>>>
>>>> Agreed that a retroactive behavior change would be bad, even if tied to a 
>>>> beam version change. I agree that it meshes well with the general theme of 
>>>> State & Timers exposing underlying primitives for implementing Windowing 
>>>> and similar. I'd say the distinction between the two might be additional 
>>>> complexity for users to grok, and would need to be documented well, as 
>>>> both operate in the ProcessingTime domain, but differently.
>>>>
>>>> What to call this new timer then? DelayTimer?
>>>>
>>>> "A DelayTimer sets an instant in ProcessingTime at which point 
>>>> computations can continue. Runners will prevent the EventTimer watermark 
>>>> from advancing past the set OutputTime until Processing Time has advanced 
>>>> to at least the provided instant to execute the timers callback. This can 
>>>> be used to allow the runner to constrain pipeline throughput with user 
>>>> guidance."
>>>>
>>>> I'd probably add that a timer with an output time outside of the window 
>>>> would not be guaranteed to fire, and that OnWindowExpiry is the correct 
>>>> way to ensure cleanup occurs.
>>>>
>>>> No solution to the Looping Timers on Drain problem here, but i think 
>>>> that's ultimately an orthogonal discussion, and will rest

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
>
> Pulling out focus points:
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
> > I can't act on something yet [...] but I expect to be able to [...] at some 
> > time in the processing-time future.
>
> I like this as a clear and internally-consistent feature description. It 
> describes ProcessContinuation and those timers which serve the same purpose 
> as ProcessContinuation.
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
> > I can't think of a batch or streaming scenario where it would be correct to 
> > not wait at least that long
>
> The main reason we created timers: to take action in the absence of data. The 
> archetypal use case for processing time timers was/is "flush data from state 
> if it has been sitting there too long". For this use case, the right behavior 
> for batch is to skip the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
> > It doesn't require a new primitive.
>
> IMO what's being proposed *is* a new primitive. I think it is a good 
> primitive. It is the underlying primitive to ProcessContinuation. It would be 
> user-friendly as a kind of timer. But if we made this the behavior of 
> processing time timers retroactively, it would break everyone using them to 
> flush data who is also reprocessing data.
>
> There's two very different use cases ("I need to wait, and block data" vs "I 
> want to act without data, aka NOT wait for data") and I think we should serve 
> both of them, but it doesn't have to be with the same low-level feature.
>
> Kenn
>
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>> >
>> > While I'm currently on the other side of the fence, I would not be against 
>> > changing/requiring the semantics of ProcessingTime constructs to be "must 
>> > wait and execute" as such a solution, and enables the Proposed "batch" 
>> > process continuation throttling mechanism to work as hypothesized for both 
>> > "batch" and "streaming" execution.
>> >
>> > There's a lot to like, as it leans Beam further into the unification of 
>> > Batch and Stream, with one fewer exception (eg. unifies timer experience 
>> > further). It doesn't require a new primitive. It probably matches more 
>> > with user expectations anyway.
>> >
>> > It does cause looping timer execution with processing time to be a problem 
>> > for Drains however.
>>
>> I think we have a problem with looping timers plus drain (a mostly
>> streaming idea anyway) regardless.
>>
>> > I'd argue though that in the case of a drain, we could updated the 
>> > semantics as "move watermark to infinity"  "existing timers are executed, 
>> > but new timers are ignored",
>>
>> I don't like the idea of dropping timers for drain. I think correct
>> handling here requires user visibility into whether a pipeline is
>> draining or not.
>>
>> > and ensure/and update the requirements around OnWindowExpiration callbacks 
>> > to be a bit more insistent on being implemented for correct execution, 
>> > which is currently the only "hard" signal to the SDK side that the 
>> > window's work is guaranteed to be over, and remaining state needs to be 
>> > addressed by the transform or be garbage collected. This remains critical 
>> > for developing a good pattern for ProcessingTime timers within a Global 
>> > Window too.
>>
>> +1
>>
>> >
>> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
>> > > Thanks for bringing this up.
>> > >
>> > > My position is that both batch and streaming should wait for
>> > > processing time timers, according to local time (with the exception of
>> > > tests that can accelerate this via faked clocks).
>> > >
>> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
>> > >

Re: Throttle PTransform

2024-02-23 Thread Robert Bradshaw via dev
On Thu, Feb 22, 2024 at 10:16 AM Robert Bradshaw  wrote:
>
> On Thu, Feb 22, 2024 at 9:37 AM Reuven Lax via dev  
> wrote:
> >
> > On Thu, Feb 22, 2024 at 9:26 AM Kenneth Knowles  wrote:
> >>
> >> Wow I love your input Reuven. Of course "the source" that you are applying 
> >> backpressure to is often a runner's shuffle so it may be state anyhow, but 
> >> it is good to give the runner the choice of how to figure that out and 
> >> maybe chain backpressure further.
> >
> >
> > Sort of - however most (streaming) runners apply backpressure through 
> > shuffle as well. This means that while some amount of data will accumulate 
> > in shuffle, eventually the backpressure will push back to the source. 
> > Caveat of course is that this is mostly true for streaming runners, not 
> > batch runners.
>
> For batch it's still preferable to keep the data upstream in shuffle
> (which has less size limitations) than state (which must reside in
> worker memory, though only one key at a time).

And for drain (or even cancel), it's preferable to have as much as
possible upstream in the source than sitting in state.


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Bradshaw via dev
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>
> While I'm currently on the other side of the fence, I would not be against 
> changing/requiring the semantics of ProcessingTime constructs to be "must 
> wait and execute" as such a solution, and enables the Proposed "batch" 
> process continuation throttling mechanism to work as hypothesized for both 
> "batch" and "streaming" execution.
>
> There's a lot to like, as it leans Beam further into the unification of Batch 
> and Stream, with one fewer exception (eg. unifies timer experience further). 
> It doesn't require a new primitive. It probably matches more with user 
> expectations anyway.
>
> It does cause looping timer execution with processing time to be a problem 
> for Drains however.

I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.

> I'd argue though that in the case of a drain, we could updated the semantics 
> as "move watermark to infinity"  "existing timers are executed, but new 
> timers are ignored",

I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.

> and ensure/and update the requirements around OnWindowExpiration callbacks to 
> be a bit more insistent on being implemented for correct execution, which is 
> currently the only "hard" signal to the SDK side that the window's work is 
> guaranteed to be over, and remaining state needs to be addressed by the 
> transform or be garbage collected. This remains critical for developing a 
> good pattern for ProcessingTime timers within a Global Window too.

+1

>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming should wait for
> > processing time timers, according to local time (with the exception of
> > tests that can accelerate this via faked clocks).
> >
> > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms of each other (at least in
> > one direction, and likely the other). Both are an indication that I
> > can't act on something yet due to external constraints (e.g. not all
> > the data has been published, or I lack sufficient capacity/quota to
> > push things downstream) but I expect to be able to (or at least would
> > like to check again) at some time in the processing-time future. I
> > can't think of a batch or streaming scenario where it would be correct
> > to not wait at least that long (even in batch inputs, e.g. suppose I'm
> > tailing logs and was eagerly started before they were fully written,
> > or waiting for some kind of (non-data-dependent) quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
> > >
> > > For me it always helps to seek analogy in our physical reality. Stream
> > > processing actually has quite a good analogy for both event-time and
> > > processing-time - the simplest model for this being relativity theory.
> > > Event-time is the time at which events occur _at distant locations_. Due
> > > to finite and invariant speed of light (which is actually really
> > > involved in the explanation why any stream processing is inevitably
> > > unordered) these events are observed (processed) at different times
> > > (processing time, different for different observers). It is perfectly
> > > possible for an observer to observe events at a rate that is higher than
> > > one second per second. This also happens in reality for observers that
> > > travel at relativistic speeds (which might be an analogy for fast -
> > > batch - (re)processing). Besides the invariant speed, there is also
> > > another invariant - local clock (wall time) always ticks exactly at the
> > > rate of one second per second, no matter what. It is not possible to
> > > "move faster or slower" through (local) time.
> > >
> > > In my understanding the reason why we do not put any guarantees or
> > > bounds on the delay of firing processing time timers is purely technical
> > > - the processing is (per key) single-threaded, thus any timer has to
> > > wait before any element processing finishes. This is only consequence of
> > > a technical solution, not something fundamental.
> > >
> > > Having said that, my point is that according to the above analogy, it
> > > should be perfect

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Bradshaw via dev
Thanks for bringing this up.

My position is that both batch and streaming should wait for
processing time timers, according to local time (with the exception of
tests that can accelerate this via faked clocks).

Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
isomorphic, and can be implemented in terms of each other (at least in
one direction, and likely the other). Both are an indication that I
can't act on something yet due to external constraints (e.g. not all
the data has been published, or I lack sufficient capacity/quota to
push things downstream) but I expect to be able to (or at least would
like to check again) at some time in the processing-time future. I
can't think of a batch or streaming scenario where it would be correct
to not wait at least that long (even in batch inputs, e.g. suppose I'm
tailing logs and was eagerly started before they were fully written,
or waiting for some kind of (non-data-dependent) quiessence or other
operation to finish).


On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
>
> For me it always helps to seek analogy in our physical reality. Stream
> processing actually has quite a good analogy for both event-time and
> processing-time - the simplest model for this being relativity theory.
> Event-time is the time at which events occur _at distant locations_. Due
> to finite and invariant speed of light (which is actually really
> involved in the explanation why any stream processing is inevitably
> unordered) these events are observed (processed) at different times
> (processing time, different for different observers). It is perfectly
> possible for an observer to observe events at a rate that is higher than
> one second per second. This also happens in reality for observers that
> travel at relativistic speeds (which might be an analogy for fast -
> batch - (re)processing). Besides the invariant speed, there is also
> another invariant - local clock (wall time) always ticks exactly at the
> rate of one second per second, no matter what. It is not possible to
> "move faster or slower" through (local) time.
>
> In my understanding the reason why we do not put any guarantees or
> bounds on the delay of firing processing time timers is purely technical
> - the processing is (per key) single-threaded, thus any timer has to
> wait before any element processing finishes. This is only consequence of
> a technical solution, not something fundamental.
>
> Having said that, my point is that according to the above analogy, it
> should be perfectly fine to fire processing time timers in batch based
> on (local wall) time only. There should be no way of manipulating this
> local time (excluding tests). Watermarks should be affected the same way
> as any buffering in a state that would happen in a stateful DoFn (i.e.
> set timer holds output watermark). We should probably pay attention to
> looping timers, but it seems possible to define a valid stopping
> condition (input watermark at infinity).
>
>   Jan
>
> On 2/22/24 19:50, Kenneth Knowles wrote:
> > Forking this thread.
> >
> > The state of processing time timers in this mode of processing is not
> > satisfactory and is discussed a lot but we should make everything
> > explicit.
> >
> > Currently, a state and timer DoFn has a number of logical watermarks:
> > (apologies for fixed width not coming through in email lists). Treat
> > timers as a back edge.
> >
> > input --(A)(C)--> ParDo(DoFn) (D)---> output
> > ^  |
> > |--(B)-|
> >timers
> >
> > (A) Input Element watermark: this is the watermark that promises there
> > is no incoming element with a timestamp earlier than it. Each input
> > element's timestamp holds this watermark. Note that *event time timers
> > firing is according to this watermark*. But a runner commits changes
> > to this watermark *whenever it wants*, in a way that can be
> > consistent. So the runner can absolute process *all* the elements
> > before advancing the watermark (A), and only afterwards start firing
> > timers.
> >
> > (B) Timer watermark: this is a watermark that promises no timer is set
> > with an output timestamp earlier than it. Each timer that has an
> > output timestamp holds this watermark. Note that timers can set new
> > timers, indefinitely, so this may never reach infinity even in a drain
> > scenario.
> >
> > (C) (derived) total input watermark: this is a watermark that is the
> > minimum of the two above, and ensures that all state for the DoFn for
> > expired windows can be GCd after calling @OnWindowExpiration.
> >
> > (D) output watermark: this is a promise that the DoFn will not output
> > earlier than the watermark. It is held by the total input watermark.
> >
> > So a any timer, processing or not, holds the total input watermark
> > which prevents window GC, hence the timer must be fired. You can set
> > timers without a timestamp and they will not hold (B) hence not 

Re: Throttle PTransform

2024-02-22 Thread Robert Bradshaw via dev
On Thu, Feb 22, 2024 at 9:37 AM Reuven Lax via dev  wrote:
>
> On Thu, Feb 22, 2024 at 9:26 AM Kenneth Knowles  wrote:
>>
>> Wow I love your input Reuven. Of course "the source" that you are applying 
>> backpressure to is often a runner's shuffle so it may be state anyhow, but 
>> it is good to give the runner the choice of how to figure that out and maybe 
>> chain backpressure further.
>
>
> Sort of - however most (streaming) runners apply backpressure through shuffle 
> as well. This means that while some amount of data will accumulate in 
> shuffle, eventually the backpressure will push back to the source. Caveat of 
> course is that this is mostly true for streaming runners, not batch runners.

For batch it's still preferable to keep the data upstream in shuffle
(which has less size limitations) than state (which must reside in
worker memory, though only one key at a time).


Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-22 Thread Robert Bradshaw via dev
On Wed, Feb 21, 2024 at 11:58 PM Jan Lukavský  wrote:
>
> Reasons we use Java serialization are not fundamental, probably only 
> historical. Thinking about it, yes, there is lucky coincidence that we 
> currently have to change the serialization because of Flink 1.17 support. 
> Flink 1.17 actually removes the legacy java serialization from Flink and 
> enforces custom serialization. Therefore, we need to introduce an upgrade 
> compatible change of serialization to support Flink 1.17. This is already 
> implemented in [1]. The PR can go further, though. We can replace Java 
> serialization of Coder in the TypeSerializerSnapshot and use the portable 
> representation of Coder (which will still use Java serialization in some 
> cases, but might avoid it at least for well-known coders, moreover Coders 
> should be more upgrade-stable classes).
>
> I'll try to restore the SerializablePipelineOptions (copy&paste) in 
> FlinkRunner only and rework the serialization in a more stable way (at least 
> avoid serializing the CoderTypeSerializer, which references the 
> SerializablePipelineOptions).

Thanks!

If other runners use this (SerializablePipelineOptions seems like it's
explicitly created for this purpose) we could consider putting the
copy into core rather than just the Flink packages.

> I created [2] and marked it as blocker for 2.55.0 release, because otherwise 
> we would break the upgrade.
>
> Thanks for the discussion, it helped a lot.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/30197
>
> [2] https://github.com/apache/beam/issues/30385
>
> On 2/21/24 20:33, Kenneth Knowles wrote:
>
> Yea I think we should restore the necessary classes but also fix the 
> FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev  
> wrote:
>>
>> Is there a fundamental reason we serialize java classes into Flink 
>> savepoints.
>>
>> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev 
>>  wrote:
>>>
>>> We could consider merging the gradle targets without renaming the
>>> classpaths as an intermediate step.
>>>
>>> Optimistically, perhaps there's a small number of classes that we need
>>> to preserve (e.g. SerializablePipelineOptions looks like it was
>>> something specifically intended to be serialized; maybe that an a
>>> handful of others (that implement Serializable) could be left in their
>>> original packages for backwards compatibility reasons?
>>>
>>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský  wrote:
>>> >
>>> > Hi,
>>> >
>>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>>> > Flink 1.17. There is some change regarding serialization needed for
>>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>>> > core-construction-java into SDK, which resulted in some classes being
>>> > repackaged. Unfortunately, we serialize some classes into Flink's
>>> > check/savepoints. The renaming of the class therefore ends with the
>>> > following exception trying to restore from the savepoint:
>>> >
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>>> >  at 
>>> > java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> >  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> >  at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>>> >  at
>>> > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> >  at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>>> >  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> >  at
>>> > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>>> >  at java.base/java.lang.Class.forName0(Native Method)
>>> >  at java.base/java.lang.Class.forName(Class.java:398)
>>> >  at
>>> > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> >  at
>>> > org.apache.flink.util.InstantiationUtil$Fail

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-21 Thread Robert Bradshaw via dev
We could consider merging the gradle targets without renaming the
classpaths as an intermediate step.

Optimistically, perhaps there's a small number of classes that we need
to preserve (e.g. SerializablePipelineOptions looks like it was
something specifically intended to be serialized; maybe that an a
handful of others (that implement Serializable) could be left in their
original packages for backwards compatibility reasons?

On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský  wrote:
>
> Hi,
>
> while implementing FlinkRunner for Flink 1.17 I tried to verify that a
> running Pipeline is able to successfully upgrade from Flink 1.16 to
> Flink 1.17. There is some change regarding serialization needed for
> Flink 1.17, so this was a concern. Unfortunately recently we merged
> core-construction-java into SDK, which resulted in some classes being
> repackaged. Unfortunately, we serialize some classes into Flink's
> check/savepoints. The renaming of the class therefore ends with the
> following exception trying to restore from the savepoint:
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.beam.runners.core.construction.SerializablePipelineOptions
>  at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>  at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>  at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>  at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>  at
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>  at java.base/java.lang.Class.forName0(Native Method)
>  at java.base/java.lang.Class.forName(Class.java:398)
>  at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>  at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>
>
> This means that no Pipeline will be able to successfully upgrade from
> version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
> restarted from scratch). I wanted to know how the community would feel
> about that, this consequence probably was not clear when we merged the
> artifacts. The only option would be to revert the merge and then try to
> figure out how to avoid Java serialization in Flink's savepoints. That
> would definitely be costly in terms of implementation and even more to
> provide ways to transfer old savepoints to the new format (can be
> possible using state processor API). I'm aware that Beam provides no
> general guarantees about the upgrade compatibility, so it might be fine
> to just ignore this, I just wanted to shout this out loud so that we can
> make a deliberate decision.
>
> Best,
>
>   Jan
>


Re: Throttle PTransform

2024-02-21 Thread Robert Bradshaw via dev
I like the idea of pushing back to the source much better than
unboundedly buffering things in state. I was trying to think of how to
just slow things down and one problem is that while we can easily
control the number of keys, it's much harder to control (or even
detect) the number of parallel threads at any given point in time (for
which keys is simply an upper bound, especially in batch).

On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax  wrote:
>
> Agreed, that event-time throttling doesn't make sense here. In theory 
> processing-time timers have no SLA - i.e. their firing might be delayed - so 
> batch runners aren't violating the model by firing them all at the end; 
> however it does make processing time timers less useful in batch, as we see 
> here.
>
> Personally, I'm not sure I would use state and timers to implement this, and 
> I definitely wouldn't create this many keys. A couple of reasons for this:
>   1. If a pipeline is receiving input faster than the throttle rate, the 
> proposed technique would shift all those elements into the DoFn's state which 
> will keep growing indefinitely. Generally we would prefer to leave that 
> backlog in the source instead of copying it into DoFn state.
>   2. In my experience with throttling, having too much parallelism is 
> problematic. The issue is that there is some error involved whenever you 
> throttle, and this error can accumulate across many shards (and when I've 
> done this sort of thing before, I found that the error was often biased in 
> one direction). If targeting 100,000 records/sec, this  approach (if I 
> understand it correctly) would create 100,000 shards and throttle them each 
> to one element/sec. I doubt this will actually result in anything close to 
> desired throttling.
>   3. Very commonly, the request is to throttle based on bytes/sec, not 
> events/sec. Anything we build should be easily extensible to bytes/sec.
>
> What I would suggest (and what Beam users have often done in the past) would 
> be to bucket the PCollection into N buckets where N is generally smallish 
> (100 buckets, 1000 buckets, depending on the expected throughput); runners 
> that support autosharding (such as Dataflow) can automatically choose N. Each 
> shard then throttles its output to rate/N. Keeping N no larger than necessary 
> minimizes the error introduced into throttling.
>
> We also don't necessarily need state/timers here - each shard is processed on 
> a single thread, so those threads can simply throttle calls to 
> OutputReceiver.output. This way if the pipeline is exceeding the threshold, 
> backpressure will tend to simply leave excess data in the source. This also 
> is a simpler design than the proposed one.
>
> A more sophisticated design might combine elements of both - buffering a 
> bounded amount of data in state when the threshold is exceeded, but severely 
> limiting the state size. However I wouldn't start here - we would want to 
> build the simpler implementation first and see how it performs.
>
> On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev  
> wrote:
>>
>> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
>> >
>> > Hi,
>> >
>> > I have left a note regarding the proposed splitting of batch and
>> > streaming expansion of this transform. In general, a need for such split
>> > triggers doubts in me. This signals that either
>> >
>> >   a) the transform does something is should not, or
>> >
>> >   b) Beam model is not complete in terms of being "unified"
>> >
>> > The problem that is described in the document is that in the batch case
>> > timers are not fired appropriately.
>>
>> +1. The underlying flaw is that processing time timers are not handled
>> correctly in batch, but should be (even if it means keeping workers
>> idle?). We should fix this.
>>
>> > This is actually on of the
>> > motivations that led to introduction of @RequiresTimeSortedInput
>> > annotation and, though mentioned years ago as a question, I do not
>> > remember what arguments were used against enforcing sorting inputs by
>> > timestamp in the batch stateful DoFn as a requirement in the model. That
>> > would enable the appropriate firing of timers while preserving the batch
>> > invariant which is there are no late data allowed. IIRC there are
>> > runners that do this sorting by default (at least the sorting, not sure
>> > about the timers, but once inputs are sorted, firing timers is simple).
>> >
>> > A different question is if this particular transform should maybe fire
>> > not by event t

Re: Throttle PTransform

2024-02-21 Thread Robert Bradshaw via dev
On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
>
> Hi,
>
> I have left a note regarding the proposed splitting of batch and
> streaming expansion of this transform. In general, a need for such split
> triggers doubts in me. This signals that either
>
>   a) the transform does something is should not, or
>
>   b) Beam model is not complete in terms of being "unified"
>
> The problem that is described in the document is that in the batch case
> timers are not fired appropriately.

+1. The underlying flaw is that processing time timers are not handled
correctly in batch, but should be (even if it means keeping workers
idle?). We should fix this.

> This is actually on of the
> motivations that led to introduction of @RequiresTimeSortedInput
> annotation and, though mentioned years ago as a question, I do not
> remember what arguments were used against enforcing sorting inputs by
> timestamp in the batch stateful DoFn as a requirement in the model. That
> would enable the appropriate firing of timers while preserving the batch
> invariant which is there are no late data allowed. IIRC there are
> runners that do this sorting by default (at least the sorting, not sure
> about the timers, but once inputs are sorted, firing timers is simple).
>
> A different question is if this particular transform should maybe fire
> not by event time, but rather processing time?

Yeah, I was reading all of these as processing time. Throttling by
event time doesn't make much sense.

> On 2/21/24 03:00, Robert Burke wrote:
> > Thanks for the design Damon! And thanks for collaborating with me on 
> > getting a high level textual description of the key implementation idea 
> > down in writing. I think the solution is pretty elegant.
> >
> > I do have concerns about how different Runners might handle 
> > ProcessContinuations for the Bounded Input case. I know Dataflow famously 
> > has two different execution modes under the hood, but I agree with the 
> > principle that ProcessContinuation.Resume should largely be in line with 
> > the expected delay, though it's by no means guaranteed AFAIK.
> >
> > We should also ensure this is linked from 
> > https://s.apache.org/beam-design-docs if not already.
> >
> > Robert Burke
> > Beam Go Busybody
> >
> > On 2024/02/20 14:00:00 Damon Douglas wrote:
> >> Hello Everyone,
> >>
> >> The following describes a Throttle PTransform that holds element throughput
> >> to minimize downstream API overusage. Thank you for reading and your
> >> valuable input.
> >>
> >> https://s.apache.org/beam-throttle-transform
> >>
> >> Best,
> >>
> >> Damon
> >>


Re: [API PROPOSAL] PTransform.getURN, toProto, etc, for Java

2024-02-15 Thread Robert Bradshaw via dev
On Wed, Feb 14, 2024 at 10:28 AM Kenneth Knowles  wrote:
>
> Hi all,
>
> TL;DR I want to add some API like PTransform.getURN, toProto and fromProto, 
> etc. to the Java SDK. I want to do this so that making a PTransform support 
> portability is a natural part of writing the transform and not a totally 
> separate thing with tons of boilerplate.
>
> What do you think?

Huge +1 to this direction.

IMHO one of the most fundamental things about Beam is its model.
Originally this was only expressed in a specific SDK (Java) and then
got ported to others, but now that we have portability it's expressed
in a language-independent way.

The fact that we keep these separate in Java is not buying us
anything, and causes a huge amount of boilerplate that'd be great to
remove, as well as making the essential model more front-and-center.

> I think a particular API can be sorted out most easily in code (which I will 
> prepare after gathering some feedback).
>
> We already have all the translation logic written, and porting a couple 
> transforms to it will ensure the API has everything we need. We can refer to 
> Python and Go for API ideas as well.
>
> Lots of context below, but you can skip it...
>
> -
>
> When we first created the portability framework, we wanted the SDKs to be 
> "standalone" and not depend on portability. We wanted portability to be an 
> optional plugin that users could opt in to. That is totally the opposite now. 
> We want portability to be the main place where Beam is defined, and then SDKs 
> make that available in language idiomatic ways.
>
> Also when we first created the framework, we were experimenting with 
> different serialization approaches and we wanted to be independent of 
> protobuf and gRPC if we could. But now we are pretty committed and it would 
> be a huge lift to use anything else.
>
> Finally, at the time we created the portability framework, we designed it to 
> allow composites to have URNs and well-defined specs, rather than just be 
> language-specific subgraphs, but we didn't really plan to make this easy.
>
> For all of the above, most users depend on portability and on proto. So 
> separating them is not useful and just creates LOTS of boilerplate and 
> friction for making new well-defined transforms.
>
> Kenn


Re: [VOTE] Release 2.54.0, release candidate #2

2024-02-14 Thread Robert Bradshaw via dev
+1 (binding)

We've done the validation we can for now, let's not hold up the
release any longer.

(For those curious, there may be a brief period of time where Dataflow
pipelines with 2.54 still default to Runner V1 in some regions as
things roll out, but we expect this to be fully resolved next week.)

On Fri, Feb 9, 2024 at 6:28 PM Robert Burke  wrote:
>
> I can agree to that Robert Bradshaw. Thank you for letting the community know.
>
> (Disclaimer: I am on the Dataflow team myself, but do try to keep my hats 
> separated when I'm release manager).
>
> It would be bad for Beam users who use Dataflow to try to use the release but 
> be unaware of the switch. I'm in favour of the path of least user issues, 
> wherever they're cropping up from.
>
> As a heads up, I'll likely not address this thread until Wednesday morning 
> (or if there's a sooner update) as a result of this request.
>
> Separately:
> + 1 (binding)
>  I've done a few of the quickstarts from the validation sheets and updated my 
> own Beam Go code. Other than a non-blocking update to the Go wordcount 
> quickstart, I didn't run into any issues.
>
> Robert Burke
> Beam 2.54.0 Release Manager
>
> On 2024/02/10 01:41:12 Robert Bradshaw via dev wrote:
> > I validated that the release artifacts are all correct, tested some simple
> > Python and Yaml pipelines. Everything is looking good so far.
> >
> > However, could I ask that you hold this vote open a little longer? We've
> > got some Dataflow service side changes that relate to 2.54 being the first
> > release where Runner v2 is the default for Java (big change on our side),
> > and could use a bit of additional time to verify we have everything lined
> > up correctly. We should be able to finish the validation early/mid next
> > week at the latest.
> >
> > - Robert
> >
> >
> > On Fri, Feb 9, 2024 at 2:57 PM Yi Hu via dev  wrote:
> >
> > > Also tested with GCP IO performance benchmark [1]. Passed other than
> > > SpannerIO where the benchmark failed due to issues in the test suite 
> > > itself
> > > [2], not related to Beam.
> > >
> > > +1 but I had voted for another validation suite before for this RC
> > >
> > > [1]
> > > https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it/google-cloud-platform
> > > [2] https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/1326
> > >
> > > On Fri, Feb 9, 2024 at 9:43 AM Valentyn Tymofieiev via dev <
> > > dev@beam.apache.org> wrote:
> > >
> > >> +1.
> > >>
> > >> Checked postcommit test results for Python SDK, and exercised a couple of
> > >> Datadow scenarios.
> > >>
> > >> On Thu, Feb 8, 2024, 14:07 Svetak Sundhar via dev 
> > >> wrote:
> > >>
> > >>> +1 (Non-Binding)
> > >>>
> > >>> Tested with Python SDK on DirectRunner and Dataflow Runner
> > >>>
> > >>>
> > >>> Svetak Sundhar
> > >>>
> > >>>   Data Engineer
> > >>> s vetaksund...@google.com
> > >>>
> > >>>
> > >>>
> > >>> On Thu, Feb 8, 2024 at 12:45 PM Chamikara Jayalath via dev <
> > >>> dev@beam.apache.org> wrote:
> > >>>
> > >>>> +1 (binding)
> > >>>>
> > >>>> Tried out Java/Python multi-lang jobs and upgrading BQ/Kafka transforms
> > >>>> from 2.53.0 to 2.54.0 using the Transform Service.
> > >>>>
> > >>>> Thanks,
> > >>>> Cham
> > >>>>
> > >>>> On Wed, Feb 7, 2024 at 5:52 PM XQ Hu via dev 
> > >>>> wrote:
> > >>>>
> > >>>>> +1 (non-binding)
> > >>>>>
> > >>>>> Validated with a simple RunInference Python pipeline:
> > >>>>> https://github.com/google/dataflow-ml-starter/actions/runs/7821639833/job/21339032997
> > >>>>>
> > >>>>> On Wed, Feb 7, 2024 at 7:10 PM Yi Hu via dev 
> > >>>>> wrote:
> > >>>>>
> > >>>>>> +1 (non-binding)
> > >>>>>>
> > >>>>>> Validated with Dataflow Template:
> > >>>>>> https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1317
> > >>>>>>
> > >>>>>> Regards,
> > >>&

Re: [VOTE] Release 2.54.0, release candidate #2

2024-02-09 Thread Robert Bradshaw via dev
I validated that the release artifacts are all correct, tested some simple
Python and Yaml pipelines. Everything is looking good so far.

However, could I ask that you hold this vote open a little longer? We've
got some Dataflow service side changes that relate to 2.54 being the first
release where Runner v2 is the default for Java (big change on our side),
and could use a bit of additional time to verify we have everything lined
up correctly. We should be able to finish the validation early/mid next
week at the latest.

- Robert


On Fri, Feb 9, 2024 at 2:57 PM Yi Hu via dev  wrote:

> Also tested with GCP IO performance benchmark [1]. Passed other than
> SpannerIO where the benchmark failed due to issues in the test suite itself
> [2], not related to Beam.
>
> +1 but I had voted for another validation suite before for this RC
>
> [1]
> https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it/google-cloud-platform
> [2] https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/1326
>
> On Fri, Feb 9, 2024 at 9:43 AM Valentyn Tymofieiev via dev <
> dev@beam.apache.org> wrote:
>
>> +1.
>>
>> Checked postcommit test results for Python SDK, and exercised a couple of
>> Datadow scenarios.
>>
>> On Thu, Feb 8, 2024, 14:07 Svetak Sundhar via dev 
>> wrote:
>>
>>> +1 (Non-Binding)
>>>
>>> Tested with Python SDK on DirectRunner and Dataflow Runner
>>>
>>>
>>> Svetak Sundhar
>>>
>>>   Data Engineer
>>> s vetaksund...@google.com
>>>
>>>
>>>
>>> On Thu, Feb 8, 2024 at 12:45 PM Chamikara Jayalath via dev <
>>> dev@beam.apache.org> wrote:
>>>
 +1 (binding)

 Tried out Java/Python multi-lang jobs and upgrading BQ/Kafka transforms
 from 2.53.0 to 2.54.0 using the Transform Service.

 Thanks,
 Cham

 On Wed, Feb 7, 2024 at 5:52 PM XQ Hu via dev 
 wrote:

> +1 (non-binding)
>
> Validated with a simple RunInference Python pipeline:
> https://github.com/google/dataflow-ml-starter/actions/runs/7821639833/job/21339032997
>
> On Wed, Feb 7, 2024 at 7:10 PM Yi Hu via dev 
> wrote:
>
>> +1 (non-binding)
>>
>> Validated with Dataflow Template:
>> https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1317
>>
>> Regards,
>>
>> On Wed, Feb 7, 2024 at 11:18 AM Ritesh Ghorse via dev <
>> dev@beam.apache.org> wrote:
>>
>>> +1 (non-binding)
>>>
>>> Ran a few batch and streaming examples for Python SDK on Dataflow
>>> Runner
>>>
>>> Thanks!
>>>
>>> On Wed, Feb 7, 2024 at 4:08 AM Jan Lukavský  wrote:
>>>
 +1 (binding)

 Validated Java SDK with Flink runner.

  Jan
 On 2/7/24 06:23, Robert Burke via dev wrote:

 Hi everyone,
 Please review and vote on the release candidate #2 for the version
 2.54.0,
 as follows:
 [ ] +1, Approve the release
 [ ] -1, Do not approve the release (please provide specific
 comments)


 Reviewers are encouraged to test their own use cases with the
 release
 candidate, and vote +1 if
 no issues are found. Only PMC member votes will count towards the
 final
 vote, but votes from all
 community members is encouraged and helpful for finding
 regressions; you
 can either test your own
 use cases [13] or use cases from the validation sheet [10].

 The complete staging area is available for your review, which
 includes:
 * GitHub Release notes [1],
 * the official Apache source release to be deployed to
 dist.apache.org [2],
 which is signed with the key with fingerprint D20316F712213422 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "v2.54.0-RC2" [5],
 * website pull request listing the release [6], the blog post [6],
 and
 publishing the API reference manual [7].
 * Python artifacts are deployed along with the source release to the
 dist.apache.org [2] and PyPI[8].
 * Go artifacts and documentation are available at pkg.go.dev [9]
 * Validation sheet with a tab for 2.54.0 release to help with
 validation
 [10].
 * Docker images published to Docker Hub [11].
 * PR to run tests against release branch [12].

 The vote will be open for at least 72 hours. It is adopted by
 majority
 approval, with at least 3 PMC affirmative votes.

 For guidelines on how to try the release in your projects, check
 out our RC
 testing guide [13].

 Thanks,
 Robert Burke
 Beam 2.54.0 Release Manager

 [1] https://github.com/apache/beam/milestone/18?closed=1
 [2] https://dist.apache.org/repos/dist/dev/beam/2.54.0/
 [3] https://dist.apa

Re: [python] Why CombinePerKey(lambda vs: None)?

2024-01-26 Thread Robert Bradshaw via dev
On Fri, Jan 26, 2024 at 8:43 AM Joey Tran  wrote:
>
> Hmm, I think I might still be missing something. CombinePerKey is made up of 
> "GBK() | CombineValues". Pulling it out into the Distinct, Distinct looks 
> like:
>
> def Distinct(pcoll):  # pylint: disable=invalid-name
>   """Produces a PCollection containing distinct elements of a PCollection."""
>   return (
>  pcoll
>  | 'ToPairs' >> Map(lambda v: (v, None))
>   | 'Group' >> GroupByKey()
>   | 'CombineValues >> CombineValues(lambda vs: None)
>   | 'Distinct' >> Keys())
>
> Does the combiner lifting somehow make the GroupByKey operation more 
> efficient despite coming after it? My intuition would suggest that we could 
> just remove the `CombineValues` altogether

The key property of CombineFns is that they are commutative and
associative which permits an optimization called combiner lifting.
Specifically, the operation

GroupByKey() | CombineValues(C)

re-written into

PartialCombineUsingLocalBufferMap(C) | GroupByKey() | FinalCombine(C)

that pretty much every runner supports (going back to the days of the
original MapReduce), which is what can make this so much more
efficient.

https://github.com/apache/beam/blob/release-2.21.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L669

I am, unfortunately, coming up short in finding good documentation on
this (Apache Beam specific or otherwise).


> On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev 
>  wrote:
>>
>> This is because it allows us to do some of the deduplication before
>> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
>> worker and [B, B, B, B, C, C] on another. Rather than passing all that
>> data through the GroupByKey (which involves (relatively) expensive
>> materialization and cross-machine traffic, with this form the first
>> worker will only emit [A, B] and the second [B, C] and only the B
>> needs to be deduplicated post-shuffle.
>>
>> Wouldn't hurt to have a comment to that effect there.
>>
>> https://beam.apache.org/documentation/programming-guide/#combine
>>
>> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran  wrote:
>> >
>> > Hey all,
>> >
>> > I was poking around and looking at `Distinct` and was confused about why 
>> > it was implemented the way it was.
>> >
>> > Reproduced here:
>> > @ptransform_fn
>> > @typehints.with_input_types(T)
>> > @typehints.with_output_types(T)
>> > def Distinct(pcoll):  # pylint: disable=invalid-name
>> >   """Produces a PCollection containing distinct elements of a 
>> > PCollection."""
>> >   return (
>> >   pcoll
>> >   | 'ToPairs' >> Map(lambda v: (v, None))
>> >   | 'Group' >> CombinePerKey(lambda vs: None)
>> >   | 'Distinct' >> Keys())
>> >
>> > Could anyone clarify why we'd use a `CombinePerKey` instead of just using 
>> > `GroupByKey`?
>> >
>> > Cheers,
>> > Joey


Re: [python] Why CombinePerKey(lambda vs: None)?

2024-01-26 Thread Robert Bradshaw via dev
This is because it allows us to do some of the deduplication before
shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
worker and [B, B, B, B, C, C] on another. Rather than passing all that
data through the GroupByKey (which involves (relatively) expensive
materialization and cross-machine traffic, with this form the first
worker will only emit [A, B] and the second [B, C] and only the B
needs to be deduplicated post-shuffle.

Wouldn't hurt to have a comment to that effect there.

https://beam.apache.org/documentation/programming-guide/#combine

On Fri, Jan 26, 2024 at 8:22 AM Joey Tran  wrote:
>
> Hey all,
>
> I was poking around and looking at `Distinct` and was confused about why it 
> was implemented the way it was.
>
> Reproduced here:
> @ptransform_fn
> @typehints.with_input_types(T)
> @typehints.with_output_types(T)
> def Distinct(pcoll):  # pylint: disable=invalid-name
>   """Produces a PCollection containing distinct elements of a PCollection."""
>   return (
>   pcoll
>   | 'ToPairs' >> Map(lambda v: (v, None))
>   | 'Group' >> CombinePerKey(lambda vs: None)
>   | 'Distinct' >> Keys())
>
> Could anyone clarify why we'd use a `CombinePerKey` instead of just using 
> `GroupByKey`?
>
> Cheers,
> Joey


Re: [VOTE] Vendored Dependencies Release

2024-01-19 Thread Robert Bradshaw via dev
Thanks.

+1


On Fri, Jan 19, 2024 at 1:24 PM Yi Hu  wrote:

> The process I have been following is [1]. I have also suggested edits to
> the voting email template to include the self-link. However, does anyone
> can edit this doc so the change can be made? Otherwise we might better to
> migrate this doc to
> https://github.com/apache/beam/tree/master/contributor-docs
>
> [1] https://s.apache.org/beam-release-vendored-artifacts
>
> On Thu, Jan 18, 2024 at 2:56 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Could you explain the process you used to produce these artifacts?
>>
>> On Thu, Jan 18, 2024 at 11:23 AM Kenneth Knowles  wrote:
>>
>>> +1
>>>
>>> On Wed, Jan 17, 2024 at 6:03 PM Yi Hu via dev 
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>>
>>>> Please review the release of the following artifacts that we vendor:
>>>>
>>>>  * beam-vendor-grpc-1_60_1
>>>>
>>>>
>>>> Please review and vote on the release candidate #1 for the version 0.1,
>>>> as follows:
>>>>
>>>> [ ] +1, Approve the release
>>>>
>>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>>
>>>>
>>>> The complete staging area is available for your review, which includes:
>>>>
>>>> * the official Apache source release to be deployed to dist.apache.org
>>>> [1], which is signed with the key with fingerprint 8935B943A188DE65 [2],
>>>>
>>>> * all artifacts to be deployed to the Maven Central Repository [3],
>>>>
>>>> * commit hash "52b4a9cb58e486745ded7d53a5b6e2d2312e9551" [4],
>>>>
>>>> The vote will be open for at least 72 hours. It is adopted by majority
>>>> approval, with at least 3 PMC affirmative votes.
>>>>
>>>> Thanks,
>>>>
>>>> Release Manager
>>>>
>>>> [1] https://dist.apache.org/repos/dist/dev/beam/vendor/
>>>>
>>>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>>>
>>>> [3]
>>>> https://repository.apache.org/content/repositories/orgapachebeam-1366/
>>>>
>>>> [4]
>>>> https://github.com/apache/beam/commits/52b4a9cb58e486745ded7d53a5b6e2d2312e9551/
>>>>
>>>>
>>>> --
>>>>
>>>> Yi Hu, (he/him/his)
>>>>
>>>> Software Engineer
>>>>
>>>>
>>>>


Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Robert Bradshaw via dev
I think this standard design could still be made to work.
Specifically, the graph would contain a DoFn that has the
RequiresTimeSortedInput bit set, and as a single "subtransform" that
has a different DoFn in its spec that does not require this bit to be
set and whose implementation enforces this ordering (say, via state)
before invoking the user's DoFn. This would work fine in Streaming for
any runner, and would work OK for batch as long as the value set for
any key fit into memory (or the batch state implementation spilled to
disk, though that could get really slow). Runners that wanted to do
better (e.g. provide timestamp sorting as part of their batch
grouping, or even internally sort timestamps more efficiently than
could be done via the SDK over the state API) could do so.

For Java, such a wrapper might be a bit messy, but could probably be
hard coded above the ByteBuddy wrappers layer.

TBD how much of our infrastructure assumes ParDo transforms do not
contain subtransforms. (We could also provide a different URN for
RequresTimeSortedInput DoFns whose payload would be the DoFn payload,
rather than setting a bit on the payload itself.) Rather than
introducing nesting, we could implement the AnyOf PTransform that
would present the two implementations as siblings (which could be
useful elsewhere). This can be made backward compatible by providing
one of the alternatives as the composite structure. The primary
hesitation I have here is that it prevents much
introspection/manipulation of the pipeline before the runner
capabilities are know.

What we really want is a way to annotate a DoFn as
RequestsTimeSortedInput, together with a way for the runner to
communicate to the SDK whether or not it was able to honor this
request. That may be a more invasive change to the protocol (e.g.
annotating PCollections with ordering properties, which is where it
belongs[1]). I suppose we could let a runner that supports this
capability strip the RequestsTimeSortedInput bit (or set a new bit),
and SDKs that get unmutated transforms would know they have to do the
sorting themselves.

- Robert

[1] Ordering is an under-defined concept in Beam, but if we're going
to add it my take would be that to do it properly one would want

(1) Annotations on PCollections indicating whether they're unordered
or ordered (by a certain ordering criteria, in this case
timestamp-within-key), which could be largely inferred by
(2) Annotations on PTransforms indicating whether they're
order-creating, order-preserving, or order-requiring (with the default
being unspeciified=order-destroying), again parameterized by an
ordering criteria of some kind, which criteria could for a hierarchy.


On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles  wrote:
>
> In this design space, what we have done in the past is:
>
> 1) ensure that runners all reject pipelines they cannot run correctly
> 2) if there is a default/workaround/slower implementation, provide it as an 
> override
>
> This is largely ignoring portability but I think/hope it will still work. At 
> one time I put some effort into ensuring Java Pipeline objects and proto 
> representations could roundtrip with all the necessary information for 
> pre-portability runners to still work, which is the same prereqs as 
> pre-portable "Override" implementations to still work.
>
> TBH I'm 50/50 on the idea. If something is going to be implemented more 
> slowly or less scalably as a fallback, I think it may be best to simply be 
> upfront about being unable to really run it. It would depend on the 
> situation. For requiring time sorted input, the manual implementation is 
> probably similar to what a streaming runner might do, so it might make sense.
>
> Kenn
>
> On Fri, Jan 19, 2024 at 11:05 AM Robert Burke  wrote:
>>
>> I certainly don't have the deeper java insight here. So one more portable 
>> based reply and then I'll step back on the Java specifics.
>>
>> Portable runners only really have the "unknown Composite" fallback option, 
>> where if the Composite's URN isn't known to the runner, it should use the 
>> subgraph that is being wrapped.
>>
>> I suppose the protocol could be expanded : If a composite transform with a 
>> ParDo payload, and urn has features the runner can't handle, then it could 
>> use the fallback graph as well.
>>
>> The SDK would have then still needed to have construct the fallback graph 
>> into the Pipeline proto. This doesn't sound incompatible with what you've 
>> suggested the Java SDK could do, but it avoids the runner needing to be 
>> aware of a specific implementation requirement around a feature it doesn't 
>> support.  If it has to do something specific to support an SDK specific 
>> mechanism, that's still supporting the feature, but I fear it's not a great 
>> road to tread on for runners to add SDK specific implementation details.
>>
>> If a (portable) runner is going to spend work on doing something to handle 
>> RequiresTimeSortedInput, it's probably easi

Re: [VOTE] Vendored Dependencies Release

2024-01-18 Thread Robert Bradshaw via dev
Could you explain the process you used to produce these artifacts?

On Thu, Jan 18, 2024 at 11:23 AM Kenneth Knowles  wrote:

> +1
>
> On Wed, Jan 17, 2024 at 6:03 PM Yi Hu via dev  wrote:
>
>> Hi everyone,
>>
>>
>> Please review the release of the following artifacts that we vendor:
>>
>>  * beam-vendor-grpc-1_60_1
>>
>>
>> Please review and vote on the release candidate #1 for the version 0.1,
>> as follows:
>>
>> [ ] +1, Approve the release
>>
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> The complete staging area is available for your review, which includes:
>>
>> * the official Apache source release to be deployed to dist.apache.org
>> [1], which is signed with the key with fingerprint 8935B943A188DE65 [2],
>>
>> * all artifacts to be deployed to the Maven Central Repository [3],
>>
>> * commit hash "52b4a9cb58e486745ded7d53a5b6e2d2312e9551" [4],
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>>
>> Release Manager
>>
>> [1] https://dist.apache.org/repos/dist/dev/beam/vendor/
>>
>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>
>> [3]
>> https://repository.apache.org/content/repositories/orgapachebeam-1366/
>>
>> [4]
>> https://github.com/apache/beam/commits/52b4a9cb58e486745ded7d53a5b6e2d2312e9551/
>>
>>
>> --
>>
>> Yi Hu, (he/him/his)
>>
>> Software Engineer
>>
>>
>>


Re: [YAML] add timestamp to a bounded PCollection

2024-01-09 Thread Robert Bradshaw via dev
Just created https://github.com/apache/beam/pull/29969

On Mon, Jan 8, 2024 at 2:49 PM Robert Bradshaw  wrote:
>
> This does appear to be a significant missing feature. I'll try to make
> sure something easier gets in by the next release. See also below.
>
> On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
>  wrote:
> >
> > Hi Yarden,
> >
> > Since it's a bounded source you could try with Sql transformation
> > grouping by the timestamp column. Here are some examples of grouping:
> >
> > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
> >
> > However, if you want to add a timestamp column in addition to the
> > original CSV records then, there are multiple ways to achieve that.
> >
> > 1) MapToFields:
> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> > [Your timestamp column could be a callable to get the current
> > timestamp on each record]
> >
> > 2) If you need an extra layer of transformation complexity I would
> > recommend creating a custom transformation:
> >
> > # - type: MyCustomTransform
> > # name: AddDateTimeColumn
> > # config:
> > # prefix: 'whatever'
> >
> > providers:
> > - type: 'javaJar'
> > config:
> > jar: 'gs://path/of/the/java.jar'
> > transforms:
> > MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'
>
> Alternatively you can use PyTransform, if you're more comfortable with
> that by invoking it via its fully qualified name.
>
> pipeline:
>   transforms:
> ...
> - type: MyAssignTimestamps
>   config:
>   kwarg1: ...
>   kwarg2: ...
>
> providers:
>   type:python
>   config:
> packages: ['py_py_package_identifier']
>   transforms:
> MyAssignTimestamps:
> fully_qualified_package.module.AssignTimestampsPTransform
>
>
>
> > Best,
> > Ferran
> >
> > El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> > escribió:
> > >
> > > Hi all,
> > > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > > implement some windowing logic to it.
> > > Was wondering what is the right way to add timestamps to each element, 
> > > assuming I have a column including a timestamp.
> > >
> > > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > > this can be implemented and used from yaml prespective.
> > >
> > > Thanks
> > > Yarden


Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Robert Bradshaw via dev
This does appear to be a significant missing feature. I'll try to make
sure something easier gets in by the next release. See also below.

On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
 wrote:
>
> Hi Yarden,
>
> Since it's a bounded source you could try with Sql transformation
> grouping by the timestamp column. Here are some examples of grouping:
>
> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
>
> However, if you want to add a timestamp column in addition to the
> original CSV records then, there are multiple ways to achieve that.
>
> 1) MapToFields:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> [Your timestamp column could be a callable to get the current
> timestamp on each record]
>
> 2) If you need an extra layer of transformation complexity I would
> recommend creating a custom transformation:
>
> # - type: MyCustomTransform
> # name: AddDateTimeColumn
> # config:
> # prefix: 'whatever'
>
> providers:
> - type: 'javaJar'
> config:
> jar: 'gs://path/of/the/java.jar'
> transforms:
> MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Alternatively you can use PyTransform, if you're more comfortable with
that by invoking it via its fully qualified name.

pipeline:
  transforms:
...
- type: MyAssignTimestamps
  config:
  kwarg1: ...
  kwarg2: ...

providers:
  type:python
  config:
packages: ['py_py_package_identifier']
  transforms:
MyAssignTimestamps:
fully_qualified_package.module.AssignTimestampsPTransform



> Best,
> Ferran
>
> El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> escribió:
> >
> > Hi all,
> > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > implement some windowing logic to it.
> > Was wondering what is the right way to add timestamps to each element, 
> > assuming I have a column including a timestamp.
> >
> > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > this can be implemented and used from yaml prespective.
> >
> > Thanks
> > Yarden


Re: (python SDK) "Any" coder bypasses registry coders

2024-01-05 Thread Robert Bradshaw via dev
On Fri, Jan 5, 2024 at 9:42 AM Joey Tran  wrote:
>
>
> I think my original message made it sound like what I thought was confusing 
> was how `Any` works. The scenario that I actually think is confusing is *if a 
> user registers a coder for a data type, this preference will get ignored in 
> non-obvious situations and can (and in my scenario, has) result in 
> non-obvious downstream issues.*


I agree this can be confusing. Essentially, Coders are attached to
PCollections (which are assumed to be of homogeneous type) at compile
time.

>
> On Fri, Jan 5, 2024 at 12:05 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Fri, Jan 5, 2024 at 7:38 AM Joey Tran  wrote:
>>>
>>> I've been working with a few data types that are in practice unpicklable 
>>> and I've run into a couple issues stemming from the `Any` type hint, which 
>>> when used, will result in the PickleCoder getting used even if there's a 
>>> coder in the coder registry that matches the data element.
>>
>>
>> This is likely because we don't know the data type at the time we choose the 
>> coder.
>>
>>>
>>> This was pretty unexpected to me and can result in pretty cryptic 
>>> downstream issues. In the best case, you get an error at pickling time [1], 
>>> and in the worse case, the pickling "succeeds" (since many objects can get 
>>> (de)pickeld without obvious error) but then results in downstream issues 
>>> (e.g. some data doesn't survive depickling).
>>
>>
>> It shouldn't be the case that an object depickles successfully but 
>> incorrectly; sounds like a bug in some custom pickling code.
>
> You don't need custom pickling code for this to happen. For a contrived 
> example, you could imagine some class that caches some state specific to a 
> local system and saves it to a private local variable. If you pickle one of 
> these and then unpickle it on a different system, it would've been unpickled 
> successfully but would be in a bad state.
>
> Rather than mucking around with custom pickling, someone might want to just 
> implement a coder for their special class instead.


It's the same work in both cases, though I can see a coder being
preferable if one does not own the class. (Though copyreg should work
just as well.) In that case, I'd consider explicitly making the class
throw an exception on pickling (though I agree it's hard to see how
one could know to do this by default).

>>>
>>> One example case of the latter is if you flatten a few pcollections 
>>> including a pcollection generated by `beam.Create([])` (the inferred output 
>>> type an empty create becomes Any)
>>
>>
>> Can you add a type hint to the Create?
>
> Yeah this fixes the issue, it's just not obvious (or at least to me) that (1) 
> beam.Create([]) will have an output type of Any (often times the parameter to 
> beam.Create will be some local variable which makes it less obvious) and that


We could update Beam to let the type hint be the empty union, which
would correspond to a coder that can't encode/decode anything, but
when unioned with others (e.g. in a Flatten) does not "taint" the
rest. This doesn't solve unioning two other disjoint types resolving
to the Any coder though.

>
> (2) in this particular case, _how_ downstream pcollections get decoded will 
> be slightly different. In the worse case, the issue won't even result in an 
> error at decoding time (as mentioned before), so then you have to backtrack 
> from some possibly unrelated sounding traceback.
>
>>>
>>> Would it make sense to introduce a new fallback coder that takes precedence 
>>> over the `PickleCoder` that encodes both the data type (by just pickling 
>>> it) and the data encoded using the registry-found coder?
>>
>>
>> This is essentially re-implementing pickle :)
>
> Pickle doesn't use coders from the coder registry which I think is the key 
> distinction here

Pickle is just a dynamic dispatch of type -> encoder.

>>>
>>> This would have some space ramifications for storing the data type for 
>>> every element. Of course this coder would only kick in _if_ the data type 
>>> was found in the registry, otherwise we'd proceed to the picklecoder like 
>>> we do currently
>>
>>
>> I do not think we'd want to introduce this as the default--that'd likely 
>> make common cases much more expensive. IIRC you can manually override the 
>> fallback coder with one of your own choosing. Alternatively, you could look 
>> at using copyreg for your problema

Re: (python SDK) "Any" coder bypasses registry coders

2024-01-05 Thread Robert Bradshaw via dev
On Fri, Jan 5, 2024 at 7:38 AM Joey Tran  wrote:

> I've been working with a few data types that are in practice
> unpicklable and I've run into a couple issues stemming from the `Any` type
> hint, which when used, will result in the PickleCoder getting used even if
> there's a coder in the coder registry that matches the data element.
>

This is likely because we don't know the data type at the time we choose
the coder.


> This was pretty unexpected to me and can result in pretty cryptic
> downstream issues. In the best case, you get an error at pickling time [1],
> and in the worse case, the pickling "succeeds" (since many objects can get
> (de)pickeld without obvious error) but then results in downstream issues
> (e.g. some data doesn't survive depickling).
>

It shouldn't be the case that an object depickles successfully but
incorrectly; sounds like a bug in some custom pickling code.

One example case of the latter is if you flatten a few pcollections
> including a pcollection generated by `beam.Create([])` (the inferred output
> type an empty create becomes Any)
>

Can you add a type hint to the Create?


> Would it make sense to introduce a new fallback coder that takes
> precedence over the `PickleCoder` that encodes both the data type (by just
> pickling it) and the data encoded using the registry-found coder?
>

This is essentially re-implementing pickle :)


> This would have some space ramifications for storing the data type for
> every element. Of course this coder would only kick in _if_ the data type
> was found in the registry, otherwise we'd proceed to the picklecoder like
> we do currently
>

I do not think we'd want to introduce this as the default--that'd likely
make common cases much more expensive. IIRC you can manually override the
fallback coder with one of your own choosing. Alternatively, you could look
at using copyreg for your problematic types.


> [1] https://github.com/apache/beam/issues/29908 (Issue arises from
> ReshuffleFromKey using `Any` as a pcollection type
>


Re: [VOTE] Release 2.53.0, release candidate #2

2024-01-04 Thread Robert Bradshaw via dev
+1 (binding)

Validated the artifacts and some simple Python pipelines in a fresh
install.

On Wed, Jan 3, 2024 at 5:46 PM Robert Burke  wrote:

> +1 (binding)
>
> Validated the Go SDK against my own pipleines.
>
> Robert Burke
>
> On Wed, Jan 3, 2024, 7:52 AM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (binding)
>>
>> Validated Java/Python x-lang jobs.
>>
>> - Cham
>>
>> On Tue, Jan 2, 2024 at 7:35 AM Jack McCluskey via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Happy New Year, everyone!
>>>
>>> Now that we're through the holidays I just wanted to bump the voting
>>> thread so we can keep the RC moving.
>>>
>>> Thanks,
>>>
>>> Jack McCluskey
>>>
>>> On Fri, Dec 29, 2023 at 11:58 AM Johanna Öjeling via dev <
>>> dev@beam.apache.org> wrote:
>>>
 +1 (non-binding).

 Tested Go SDK with Dataflow on own use cases.

 On Fri, Dec 29, 2023 at 2:57 AM Yi Hu via dev 
 wrote:

> +1 (non-binding)
>
> Tested with Beam GCP IOs benchmarking (
> https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it/google-cloud-platform
> )
>
> On Thu, Dec 28, 2023 at 11:36 AM Svetak Sundhar via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (non binding)
>>
>> Tested with Healthcare notebooks.
>>
>>
>> Svetak Sundhar
>>
>>   Data Engineer
>> s vetaksund...@google.com
>>
>>
>>
>> On Thu, Dec 28, 2023 at 3:52 AM Jan Lukavský  wrote:
>>
>>> +1 (binding)
>>>
>>> Tested Java SDK with Flink Runner.
>>>
>>>  Jan
>>> On 12/27/23 14:13, Danny McCormick via dev wrote:
>>>
>>> +1 (non-binding)
>>>
>>> Tested with some example ML notebooks.
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Tue, Dec 26, 2023 at 6:41 PM XQ Hu via dev 
>>> wrote:
>>>
 +1 (non-binding)

 Tested with the simple RunInference pipeline:
 https://github.com/google/dataflow-ml-starter/actions/runs/7332832875/job/19967521369

 On Tue, Dec 26, 2023 at 3:29 PM Jack McCluskey via dev <
 dev@beam.apache.org> wrote:

> Happy holidays everyone,
>
> Please review and vote on the release candidate #2 for the version
> 2.53.0, as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific
> comments)
>
> Reviewers are encouraged to test their own use cases with the
> release candidate, and vote +1 if no issues are found. Only PMC member
> votes will count towards the final vote, but votes from all community
> members are encouraged and helpful for finding regressions; you can 
> either
> test your own use cases [13] or use cases from the validation sheet 
> [10].
>
> The complete staging area is available for your review, which
> includes:
> * GitHub Release notes [1],
> * the official Apache source release to be deployed to
> dist.apache.org [2], which is signed with the key with
> fingerprint DF3CBA4F3F4199F4 (D20316F712213422 if automated) [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v1.2.3-RC3" [5],
> * website pull request listing the release [6], the blog post [6],
> and publishing the API reference manual [7].
> * Python artifacts are deployed along with the source release to
> the dist.apache.org [2] and PyPI[8].
> * Go artifacts and documentation are available at pkg.go.dev [9]
> * Validation sheet with a tab for 2.53.0 release to help with
> validation [10].
> * Docker images published to Docker Hub [11].
> * PR to run tests against release branch [12].
>
> The vote will be open for at least 72 hours. It is adopted by
> majority approval, with at least 3 PMC affirmative votes.
>
> For guidelines on how to try the release in your projects, check
> out our RC testing guide [13].
>
> Thanks,
>
> Jack McCluskey
>
> [1] https://github.com/apache/beam/milestone/17
> [2] https://dist.apache.org/repos/dist/dev/beam/2.53.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1365/
> [5] https://github.com/apache/beam/tree/v2.53.0-RC2
> [6] https://github.com/apache/beam/pull/29856
> [7] https://github.com/apache/beam-site/pull/657
> [8] https://pypi.org/project/apache-beam/2.53.0rc2/
> [9]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.53.0-RC2/go/pkg/beam
> [10]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1290249774
> [11

Re: Constant for beam:runner:executable_stage:v1 ?

2023-12-20 Thread Robert Bradshaw via dev
Not at all. (We may want to lift it to a proto definition that can be
shared for all languages, and similarly for "beam:runner:source:v1" and
"beam:runner:sink:v1" but moving it to a common spot in Python at least is
an improvement.

On Wed, Dec 20, 2023 at 1:19 PM Joey Tran  wrote:

> Would it feel too wrong to put it in commo_urns? [1]
>
> [1]
> https://github.com/apache/beam/blob/8de029a412ab3e87ec92caf29818b51dab4ab02d/sdks/python/apache_beam/portability/common_urns.py
>
> On Wed, Dec 20, 2023 at 4:06 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Technically it's not really part of the model so much as an
>> implementation detail, but it likely does make sense to put somewhere
>> common.
>>
>> On Wed, Dec 20, 2023 at 12:55 PM Joey Tran 
>> wrote:
>>
>>> Hey all,
>>>
>>> Is there a particular reason we hard code
>>> "beam:runner:executable_stage:v1" everywhere in the python SDK instead of
>>> putting it in common_urns?
>>>
>>>
>>>


Re: Constant for beam:runner:executable_stage:v1 ?

2023-12-20 Thread Robert Bradshaw via dev
Technically it's not really part of the model so much as an implementation
detail, but it likely does make sense to put somewhere common.

On Wed, Dec 20, 2023 at 12:55 PM Joey Tran 
wrote:

> Hey all,
>
> Is there a particular reason we hard code
> "beam:runner:executable_stage:v1" everywhere in the python SDK instead of
> putting it in common_urns?
>
>
>


Re: Python SDK logical types with argument

2023-12-20 Thread Robert Bradshaw via dev
On Wed, Dec 20, 2023 at 8:41 AM Ben San Nicolas via dev 
wrote:

> Hi,
>
> I'm looking to make use of https://github.com/apache/beam/issues/23373 so
> I can use a java avro schema with enums xlang from python.
>
> Are there existing ideas on how to implement this?
>
> I tried taking a look and the Python SDK has a very simple map from
> concrete python type to logical type, which doesn't seem sufficient to
> encode additional type info from an argument, since a map from class to
> logical type and the associated _from_typing(class) conversion doesn't
> maintain any arguments.
>
> I tried comparing this with the Java SDK, but it looks like it has a full
> abstraction around a schema model, which directly encodes the logical arg
> type/arg from the protobuf and then handles the conversions implicitly
> somehow/elsewhere. As far as I could tell, this abstraction is missing from
> python, in the sense that schemas_test.py roundtrips proto classes through
> concrete python types like typing.Mapping[typing.Optional[numpy.int64],
> bytes] rather than an SDK-defined schema class.
>
> The simplest solution might be to just update language_type() to return a
> tuple of class, arg, and register that in the logical type mapping. Does
> this make sense?
>

Yes, this'd be a great step forward. Not being able to specify the enum
logical type (that seems widely used in Java) is a major pain point for
Python.


Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Robert Bradshaw via dev
There is definitely a body of future work in intelligently merging
compatible-but-not-equal environments. (Dataflow does this for example.)
Defining/detecting compatibility is not always easy, but sometimes is, and
we should at least cover those cases and grow them over time.

On Fri, Dec 15, 2023 at 5:57 AM Joey Tran  wrote:

> Yeah I can confirm for the python runners (based on my reading of the
> translations.py [1]) that only identical environments are merged together.
>
> The funny thing is that we _originally_ implemented this hint as an
> annotation but then changed it to hint because it semantically felt more
> correct. I think we might go back to that since the environment merging
> logic isn't too flexible / easy to customize. Our type of hint is a bit
> unlike other hints anyways. Unlike resources like MinRam, these resources
> are additive (e.g. you can merge an environment that requires license A and
> an environment that requires license B into an environment that requires
> both A and B)
>
> [1]
> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>
> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke  wrote:
>
>> That would do it. We got so tunnel visioned on side inputs we missed that!
>>
>> IIRC the python local runner and Prism both only fuse transforms in
>> identical environments together. So any environmental diffs will prevent
>> fusion.
>>
>> Runners as a rule are usually free to ignore/manage hints as they like.
>> Transform annotations might be an alternative, but how those are managed
>> would be more SDK specific.
>>
>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran 
>> wrote:
>>
>>> I figured out my issue. I thought side inputs were breaking up my
>>> pipeline but after experimenting with my transforms I now realize what was
>>> actually breaking it up was different transform environments that weren't
>>> considered compatible.
>>>
>>> We have a custom resource hint (for specifying whether a transform needs
>>> access to some software license) that we use with our transforms and that's
>>> what was preventing the fusion I was expecting. I'm I'm looking into how to
>>> make these hints mergeable now.
>>>
>>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke  wrote:
>>>
>>>> Building on what Robert Bradshaw has said, basically, if these fusion
>>>> breaks don't exist, the pipeline can live lock, because the side input is
>>>> unable to finish computing for a given input element's window.
>>>>
>>>> I have recently added fusion to the Go Prism runner based on the python
>>>> side input semantics, and i was surprised that there are basically two
>>>> rules for fusion. The side input one, and for handling Stateful processing.
>>>>
>>>>
>>>> This code here is the greedy fusion algorithm that Python uses, but a
>>>> less set based, so it might be easier to follow:
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>>>
>>>> From the linked code comment:
>>>>
>>>> Side Inputs: A transform S consuming a PCollection as a side input can't
>>>>  be fused with the transform P that produces that PCollection. Further,
>>>> no transform S+ descended from S, can be fused with transform P.
>>>>
>>>> Ideally I'll add visual representations of the graphs in the test suite
>>>> here, that validates the side input dependency logic:
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>>>
>>>> (Note, that test doesn't validate expected fusion results, Prism is a
>>>> work in progress).
>>>>
>>>>
>>>> As for the Stateful rule, this is largely an implementation convenience
>>>> for runners to ensure correct execution.
>>>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>>>> those are usually relegated to the root of a fused stage, and avoids
>>>> fusions with each other. That can also cause additional stages.
>>>>
>>>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>>>> multiple stateful transforms could be fused in the same stage. But that's a
>>>> very different discussion.
>>>>
>>>> On Thu, D

Re: How do side inputs relate to stage fusion?

2023-12-14 Thread Robert Bradshaw via dev
That is correct. Side inputs give a view of the "whole" PCollection and
hence introduce a fusion-producing barrier. For example, suppose one has a
DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
(as the main and side input respectively) of DoFnB.

   mainPColl - DoFnB
/^
inPColl -- DoFnA |
\|
   sidePColl --- /


Now DoFnB may iterate over the entity of sidePColl for every element of
mainPColl. This means that DoFnA and DoFnB cannot be fused, which
would require DoFnB to consume the elements as they are produced from
DoFnA, but we need DoFnA to run to completion before we know the contents
of sidePColl.

Similar constraints apply in larger graphs (e.g. there may be many
intermediate DoFns and PCollections), but they principally boil down to
shapes that look like this.

Though this does not introduce a global barrier in streaming, there is
still the analogous per window/watermark barrier that prevents fusion for
the same reasons.




On Thu, Dec 14, 2023 at 3:02 PM Joey Tran  wrote:

> Hey all,
>
> We have a pretty big pipeline and while I was inspecting the stages, I
> noticed there is less fusion than I expected. I suspect it has to do with
> the heavy use of side inputs in our workflow. In the python sdk, I see that
> side inputs are considered when determining whether two stages are fusible.
> I have a hard time getting a clear understanding of the logic though. Could
> someone clarify / summarize the rules around this?
>
> Thanks!
> Joey
>


Re: State-Aware Element Batching in Python

2023-12-08 Thread Robert Bradshaw via dev
Thanks for the nice doc. So it looks like this new code is taken
iff max_batch_duration_secs is set?

On Thu, Dec 7, 2023 at 12:28 PM Jack McCluskey via dev 
wrote:

> Hey everyone,
>
> This is long-overdue, but I wanted to share a doc (
> https://docs.google.com/document/d/1Rin_5Vm3qT1Mkb5PcHgTDrjXc3j0Admzi3fEGEHB2-4/edit?usp=sharing)
> that outlines a new implementation of BatchElements in the Python SDK that
> is state-aware, and therefore allows for batching elements across bundles
> with dynamic batch sizing. The transform is actually already in the Beam
> repo, as a prototype PR rapidly turned into the bulk of the work being
> finished. This doc does, at the very least, outline some interesting
> mechanics and nuances involved in putting this together.
>
> The DoFn is largely targeted for use in the RunInference framework;
> however it will be accessible as of 2.53.0 for general use through
> BatchElements as well. The code itself is at
> https://github.com/apache/beam/blob/5becfb8ed430fe9a317cd2ffded576fe2ab8e980/sdks/python/apache_beam/transforms/util.py#L651
>
> Thanks,
>
> Jack McCluskey
>
> --
>
>
> Jack McCluskey
> SWE - DataPLS PLAT/ Dataflow ML
> RDU
> jrmcclus...@google.com
>
>
>


Re: Build python Beam from source

2023-12-05 Thread Robert Bradshaw via dev
To use cross language capabilities from a non-release branch you'll
have to build the cross-language bits yourself as well. This can be
done by

(1) Making sure Java (for java dependencies) is installed.
(2) In the top level of the repository, running .//gradlew
sdks:java:io:expansion-service:shadowJar

For released versions of Beam, it will automatically fetch the
pre-built, released artifacts for you from maven. You can manually
request those of a previous release by passing something like

--beam_services='{"sdks:java:extensions:sql:expansion-service:shadowJar":
"https://repository.apache.org/content/repositories/orgapachebeam-1361/org/apache/beam/beam-sdks-java-extensions-sql-expansion-service/2.52.0/beam-sdks-java-extensions-sql-expansion-service-2.52.0.jar"}'

which basically says "when looking for this target, use that jar"
though as this is using an out-of-date copy of the libraries this may
not always work.


On Tue, Dec 5, 2023 at 6:14 AM Поротиков Станислав Вячеславович via
dev  wrote:
>
> Hello!
> How to properly install/build apache-beam python package from source?
>
> I've tried running:
>
> pip install .
>
> from skds/python directory
>
> It's installed successfully, but when I try to run python beam pipeline, it 
> complains:
> RuntimeError: 
> /lib/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar
>  not found. Please build the server with
>
>  cd /lib; ./gradlew 
> sdks:java:io:expansion-service:shadowJar
>
>
>
> Glad to any help!
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>


Re: Implementing tuple type support in for ClickHouse connector

2023-12-04 Thread Robert Bradshaw via dev
Note that Logical types are not as portable (e.g. for cross-langauge use).

On Mon, Dec 4, 2023 at 9:18 AM Alexey Romanenko
 wrote:
>
> Did you take a look by chance on 
> org.apache.beam.sdk.schemas.Schema.LogicalType? Can it be helpful for your 
> case?
>
> On 4 Dec 2023, at 12:02, Mark Zitnik  wrote:
>
> Yes I know it is done in  org.apache.beam.sdk.io.clickhouse.TableSchema (Did 
> it for several other types), but since Tuple is a nested type that can hold 
> any number of other ClickHouse types I was wondering what is the best type 
> from the Apache Beam side in order to implement it.
>
> Mark
>
> On Mon, Dec 4, 2023 at 12:24 PM Alexey Romanenko  
> wrote:
>>
>> Hi Mark,
>>
>> What do you mean by “support” in this case? To map this ClickHouse data type 
>> to a Beam Schema data type as it’s done in 
>> org.apache.beam.sdk.io.clickhouse.TableSchema for other types or something 
>> else?
>>
>> —
>> Alexey
>>
>> On 3 Dec 2023, at 10:35, Mark Zitnik  wrote:
>>
>> Hi Team,
>>
>> I am one of the committers of the ClickHouse integration team.
>> I need to add support for Tuple in the ClickHouse connector for Apache Beam. 
>> What is the best approval for implementing that? 
>> Tuple(https://clickhouse.com/docs/en/sql-reference/data-types/tuple) in a 
>> nested data 
>> type(https://clickhouse.com/docs/en/sql-reference/data-types#data_types).
>> If you can point me to a reference on other connectors
>>
>> Thanks
>> -MZ
>>
>>
>>
>>
>


Re: Row compatible generated coders for custom classes

2023-12-01 Thread Robert Bradshaw via dev
On Fri, Dec 1, 2023 at 9:13 AM Steven van Rossum via dev
 wrote:
>
> Hi all,
>
> I was benchmarking the fastjson2 serialization library a few weeks back for a 
> Java pipeline I was working on and was asked by a colleague to benchmark 
> binary JSON serialization against Rows for fun. We didn't do any extensive 
> analysis across different shapes and sizes, but the finding on this workload 
> was that serialization to binary JSON (tuple representation)

Meaning BSON I presume? What do you mean by "tuple representation"?
(One downside of JSON is that the field names are redundantly stored
in each record, so even if you save on CPU it may hurt on the network
due to the greater data sizes).

> outperformed the SchemaCoder on throughput by ~11x on serialization and ~5x 
> on deserialization. Additionally, RowCoder outperformed SchemaCoder on 
> throughput by ~1.3x on serialization and ~1.7x on deserialization. Note that 
> all benchmarks measured in the millions of ops/sec for this quick test, so 
> this is already excellent performance obviously.

Sounds like there's a lot of room for improvement! One downside of
Rows is that they can't (IIRC) store (and encode/decode) unboxed
representations of their primitive field types. This alone would be
good to solve, but as mentioned you could probably also skip a Row
intermediate altogether for encoding/decoding.

> I'm sure there's stuff to learn from other serialization libraries, but I'd 
> table that for now. The low hanging fruit improvement would be to skip that 
> intermediate hop to/from Row and instead generate custom SchemaCoders to 
> serialize directly into or deserialize from the Row format.
> I'd be happy to pick this up at some point in the new year, but would just 
> like to get some thoughts from this group.

+1, this'd be a great addition. I think there was some investigation
into using bytebuddy to auto-generate this kind of thing, but I don't
know how extensive it is.


Re: [VOTE] Release 2.52.0, release candidate #5

2023-11-16 Thread Robert Bradshaw via dev
+1 (binding)

The artifacts all look good, as does Python installation into a fresh
environment.


On Thu, Nov 16, 2023 at 2:41 PM Svetak Sundhar via dev 
wrote:

> +1 (non binding)
>
> validated on python use cases.
>
>
> Svetak Sundhar
>
>   Data Engineer
> s vetaksund...@google.com
>
>
>
> On Wed, Nov 15, 2023 at 8:52 AM Jan Lukavský  wrote:
>
>> +1 (binding)
>>
>> Validated Java SDK with Flink runner on own use cases.
>>
>>   Jan
>>
>> On 11/15/23 11:35, Jean-Baptiste Onofré wrote:
>> > +1 (binding)
>> >
>> > Quickly tested Java SDK and checked the legal part (hash, signatures,
>> headers).
>> >
>> > Regards
>> > JB
>> >
>> > On Tue, Nov 14, 2023 at 12:06 AM Danny McCormick via dev
>> >  wrote:
>> >> Hi everyone,
>> >> Please review and vote on the release candidate #5 for the version
>> 2.52.0, as follows:
>> >> [ ] +1, Approve the release
>> >> [ ] -1, Do not approve the release (please provide specific comments)
>> >>
>> >>
>> >> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1 if no issues are found. Only PMC member votes will
>> count towards the final vote, but votes from all community members is
>> encouraged and helpful for finding regressions; you can either test your
>> own use cases or use cases from the validation sheet [10].
>> >>
>> >> The complete staging area is available for your review, which includes:
>> >>
>> >> GitHub Release notes [1]
>> >> the official Apache source release to be deployed to dist.apache.org
>> [2], which is signed with the key with fingerprint D20316F712213422 [3]
>> >> all artifacts to be deployed to the Maven Central Repository [4]
>> >> source code tag "v2.52.0-RC5" [5]
>> >> website pull request listing the release [6], the blog post [6], and
>> publishing the API reference manual [7]
>> >> Python artifacts are deployed along with the source release to the
>> dist.apache.org [2] and PyPI[8].
>> >> Go artifacts and documentation are available at pkg.go.dev [9]
>> >> Validation sheet with a tab for 2.52.0 release to help with validation
>> [10]
>> >> Docker images published to Docker Hub [11]
>> >> PR to run tests against release branch [12]
>> >>
>> >>
>> >> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>> >>
>> >> For guidelines on how to try the release in your projects, check out
>> our blog post at https://beam.apache.org/blog/validate-beam-release/.
>> >>
>> >> Thanks,
>> >> Danny
>> >>
>> >> [1] https://github.com/apache/beam/milestone/16
>> >> [2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
>> >> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> >> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1363/
>> >> [5] https://github.com/apache/beam/tree/v2.52.0-RC5
>> >> [6] https://github.com/apache/beam/pull/29331
>> >> [7] https://github.com/apache/beam-site/pull/655
>> >> [8] https://pypi.org/project/apache-beam/2.52.0rc5/
>> >> [9]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC5/go/pkg/beam
>> >> [10]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
>> >> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>> >> [12] https://github.com/apache/beam/pull/29418
>>
>


Re: Hiding logging for beam playground examples

2023-11-14 Thread Robert Bradshaw via dev
+1 to at least setting the log level to higher than info. Some runner
logging (e.g. job started/done) may be useful.

On Tue, Nov 14, 2023 at 9:37 AM Joey Tran  wrote:
>
> Hi all,
>
> I just had a workshop to demo beam for people at my company and there was a 
> bit of confusion about whether the beam python playground examples were even 
> working and it turned out they just got confused by all the runner logging 
> that is output.
>
> Is this worth keeping? It seems like it'd be a common source of confusion for 
> new users
>
> Cheers,
> Joey


Re: The Current State of Beam Python Type Hinting

2023-11-14 Thread Robert Bradshaw via dev
Thanks for writing this up! Added some comments to the doc itself.

On Mon, Nov 13, 2023 at 11:01 PM Johanna Öjeling via dev <
dev@beam.apache.org> wrote:

> Thanks - well written! Interesting with the Any type, I learned something
> new. Added a comment.
>
> Johanna
>
> On Mon, Nov 13, 2023 at 6:02 PM Jack McCluskey via dev <
> dev@beam.apache.org> wrote:
>
>> Hey everyone,
>>
>> I put together a small doc explaining how Beam Python type hinting
>> works + where the module needs to go in the future with changes to Python
>> itself. This is over at
>> https://s.apache.org/beam-python-type-hinting-overview and I'll be
>> putting it into a few places for discoverability as well.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> --
>>
>>
>> Jack McCluskey
>> SWE - DataPLS PLAT/ Dataflow ML
>> RDU
>> jrmcclus...@google.com
>>
>>
>>


Re: Adding Dead Letter Queues to Beam IOs

2023-11-10 Thread Robert Bradshaw via dev
Thanks. I added some comments to the doc and open PR.

On Wed, Nov 8, 2023 at 12:44 PM John Casey via dev  wrote:
>
> Hi All,
>
> I've written up a design for adding DLQs to existing Beam IOs. It's been 
> through a round of reviews with some Dataflow folks at Google, but I'd 
> appreciate any comments the rest of Beam have around how to refine the design.
>
> TL;DR: Make it easy for a user to configure IOs to route bad data to an 
> alternate sink instead of crashing the pipeline or having the record be 
> retried indefinitely.
>
> https://docs.google.com/document/d/1NGeCk6tOqF-TiGEAV7ixd_vhIiWz9sHPlCa1P_77Ajs/edit?usp=sharing
>
> Thanks!
>
> John


Re: [VOTE] Release 2.52.0, release candidate #3

2023-11-10 Thread Robert Bradshaw via dev
+1 (binding)

Artifacts and signatures look good, validated one of the Python wheels
in a fresh install.

On Fri, Nov 10, 2023 at 7:23 AM Alexey Romanenko
 wrote:
>
> +1 (binding)
>
> Java SDK with Spark runner
>
> —
> Alexey
>
> On 9 Nov 2023, at 16:44, Ritesh Ghorse via dev  wrote:
>
> +1 (non-binding)
>
> Validated Python SDK quickstart batch and streaming.
>
> Thanks!
>
> On Thu, Nov 9, 2023 at 9:25 AM Jan Lukavský  wrote:
>>
>> +1 (binding)
>>
>> Validated Java SDK with Flink runner on own use cases.
>>
>>  Jan
>>
>> On 11/9/23 03:31, Danny McCormick via dev wrote:
>>
>> Hi everyone,
>> Please review and vote on the release candidate #3 for the version 2.52.0, 
>> as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> Reviewers are encouraged to test their own use cases with the release 
>> candidate, and vote +1 if no issues are found. Only PMC member votes will 
>> count towards the final vote, but votes from all community members is 
>> encouraged and helpful for finding regressions; you can either test your own 
>> use cases or use cases from the validation sheet [10].
>>
>> The complete staging area is available for your review, which includes:
>>
>> GitHub Release notes [1]
>> the official Apache source release to be deployed to dist.apache.org [2], 
>> which is signed with the key with fingerprint D20316F712213422 [3]
>> all artifacts to be deployed to the Maven Central Repository [4]
>> source code tag "v2.52.0-RC3" [5]
>> website pull request listing the release [6], the blog post [6], and 
>> publishing the API reference manual [7]
>> Python artifacts are deployed along with the source release to the 
>> dist.apache.org [2] and PyPI[8].
>> Go artifacts and documentation are available at pkg.go.dev [9]
>> Validation sheet with a tab for 2.52.0 release to help with validation [10]
>> Docker images published to Docker Hub [11]
>> PR to run tests against release branch [12]
>>
>>
>> The vote will be open for at least 72 hours. It is adopted by majority 
>> approval, with at least 3 PMC affirmative votes.
>>
>> For guidelines on how to try the release in your projects, check out our 
>> blog post at https://beam.apache.org/blog/validate-beam-release/.
>>
>> Thanks,
>> Danny
>>
>> [1] https://github.com/apache/beam/milestone/16
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4] https://repository.apache.org/content/repositories/orgapachebeam-1361/
>> [5] https://github.com/apache/beam/tree/v2.52.0-RC3
>> [6] https://github.com/apache/beam/pull/29331
>> [7] https://github.com/apache/beam-site/pull/653
>> [8] https://pypi.org/project/apache-beam/2.52.0rc2/
>> [9] https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC3/go/pkg/beam
>> [10] 
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
>> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>> [12] https://github.com/apache/beam/pull/29319
>
>


Re: [External Sender] Re: [Question] Error handling for IO Write Functions

2023-11-09 Thread Robert Bradshaw via dev
+1

Specifically, p.run().waitUntilFinish() would throw an exception if there
were errors during pipeline execution.

On Wed, Nov 8, 2023 at 8:05 AM John Casey via dev 
wrote:

> Yep, thats a common misunderstanding with beam.
>
> The code that is actually executed in the try block is just for pipeline
> construction, and no data is processed at this point in time.
>
> Once the pipeline is constructed, the various pardos are serialized, and
> sent to the runners, where they are actually executed.
>
> In this case, if there was an exception in the pardo that converts rows to
> avro, you would see the "Exception when converting Beam Row to Avro Record"
> log in whatever logs your runner provides you, and the exception would
> propagate up to your runner.
>
> In this case, your log log.info("Finished writing Parquet file to path
> {}", writePath); is inaccurate, it will log when the pipeline is
> constructed, not when the parquet write completes
>
> On Wed, Nov 8, 2023 at 10:51 AM Ramya Prasad via dev 
> wrote:
>
>> Hey John,
>>
>> Yes that's how my code is set up, I have the FileIO.write() in its own
>> try-catch block. I took a second look at where exactly the code is failing,
>> and it's actually in a ParDo function which is happening before I call
>> FileIO.write(). But even within that, I've tried adding a try-catch but the
>> error isn't stopping the actual application run in a Spark cluster. In the
>> cluster, I see that the exception is being thrown from my ParDo, but then
>> immediately after that, I see the line* INFO ApplicationMaster: Final
>> app status: SUCCEEDED, exitCode: 0. *This is roughly what my code setup
>> looks like:
>>
>> @Slf4j
>> public class ParquetWriteActionStrategy {
>>
>> public void executeWriteAction(Pipeline p) throws Exception {
>>
>> try {
>>
>> // transform PCollection from type Row to GenericRecords
>> PCollection records = p.apply("transform 
>> PCollection from type Row to GenericRecords",
>> ParDo.of(new DoFn() {
>> @ProcessElement
>> public void processElement(@Element Row row, 
>> OutputReceiver out) {
>> try {
>> 
>> } catch (Exception e) {
>> log.error("Exception when converting Beam 
>> Row to Avro Record: {}", e.getMessage());
>> throw e;
>> }
>>
>> }
>> })).setCoder(AvroCoder.of(avroSchema));
>> records.apply("Writing Parquet Output File", 
>> FileIO.
>> write()
>> .via()
>> .to(writePath)
>> .withSuffix(".parquet"));
>>
>> log.info("Finished writing Parquet file to path {}", writePath);
>> } catch (Exception e) {
>> log.error("Error in Parquet Write Action. {}", e.getMessage());
>> throw e;
>> }
>>
>> }
>>
>>
>> On Wed, Nov 8, 2023 at 9:16 AM John Casey via dev 
>> wrote:
>>
>>> There are 2 execution times when using Beam. The first execution is
>>> local, when a pipeline is constructed, and the second is remote on the
>>> runner, processing data.
>>>
>>> Based on what you said, it sounds like you are wrapping pipeline
>>> construction in a try-catch, and constructing FileIO isn't failing.
>>>
>>> e.g.
>>>
>>> try {
>>>
>>> FileIO.write().someOtherconfigs()
>>>
>>> } catch ...
>>>
>>> this will catch any exceptions in constructing fileio, but the running
>>> pipeline won't propagate exceptions through this exception block.
>>>
>>> On Tue, Nov 7, 2023 at 5:21 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> File write failures should be throwing exceptions that will
>>>> terminate the pipeline on failure. (Generally a distributed runner will
>>>> make multiple attempts before abandoning the entire pipeline of course.)
>>>>
>>>> Are you seeing files failing to be written but no exceptions being
>>>> thrown? If so, this is definitely a bug that we want to resolve.
>>>>
>>>>
>>>>

Re: [Question] Error handling for IO Write Functions

2023-11-07 Thread Robert Bradshaw via dev
File write failures should be throwing exceptions that will terminate the
pipeline on failure. (Generally a distributed runner will make multiple
attempts before abandoning the entire pipeline of course.)

Are you seeing files failing to be written but no exceptions being thrown?
If so, this is definitely a bug that we want to resolve.


On Tue, Nov 7, 2023 at 11:17 AM Ramya Prasad via dev 
wrote:

> Hello,
>
> I am a developer using Apache Beam in my Java application, and I need some
> help on how to handle exceptions when writing a file to S3. I have tried
> wrapping my code within a try-catch block, but no exception is being thrown
> within the try block. I'm assuming that FileIO doesn't throw any exceptions
> upon failure. Is there a way in which I can either terminate the program on
> failure or at least be made aware of if any of my write operations fail?
>
> Thanks and sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: Processing time watermarks in KinesisIO

2023-10-31 Thread Robert Bradshaw via dev
On Tue, Oct 31, 2023 at 10:28 AM Jan Lukavský  wrote:
>
> On 10/31/23 17:44, Robert Bradshaw via dev wrote:
> > There are really two cases that make sense:
> >
> > (1) We read the event timestamps from the kafka records themselves and
> > have some external knowledge that guarantees (or at least provides a
> > very good heuristic) about what the timestamps of unread messages
> > could be in the future to set the watermark. This could possibly
> > involve knowing that the timestamps in a partition are monotonically
> > increasing, or somehow have bounded skew.
> +1
> >
> > (2) We use processing time as both the watermark and for setting the
> > event timestamp on produced messages. From this point on we can safely
> > reason about the event time.
> This is where I have some doubts. We can reason about event time, but is
> is not stable upon Pipeline restarts (if there is any downstream
> processing that depends on event time and is not shielded by
> @RequiresStableInput, it might give different results on restarts).

That is a fair point, but I don't think we can guarantee that we have
a timestamp embedded in the record. (Or is there some stable kafka
metadata we could use here, I'm not that familiar with what kafka
guarantees). We could require it to be opt-in given the caveats.

> Is
> there any specific case why not use option 1)? Do we have to provide the
> alternative 2), provided users can implement it themselves (we would
> need to allow users to specify custom timestamp function, but that
> should be done in all cases)?

The tricky bit is how the user specifies the watermark, unless they
can guarantee the custom timestamps are monotonically ordered (at
least within a partition).

> > The current state seems a bit broken if I understand correctly.
> +1
> >
> > On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský  wrote:
> >> I think that instead of deprecating and creating new version, we could 
> >> leverage the proposed update compatibility flag for this [1]. I still have 
> >> some doubts if the processing-time watermarking (and event-time 
> >> assignment) makes sense. Do we have a valid use-case for that? This is 
> >> actually the removed SYNCHRONIZED_PROCESSING_TIME time domain, which is 
> >> problematic - restarts of Pipelines causes timestamps to change and hence 
> >> makes *every* DoFn potentially non-deterministic, which would be 
> >> unexpected side-effect. This makes me wonder if we should remove this 
> >> policy altogether (deprecate or use the update compatibility flag, so that 
> >> the policy throws exception in new version).
> >>
> >> The crucial point would be to find a use-case where it is actually helpful 
> >> to use such policy.
> >> Any ideas?
> >>
> >>   Jan
> >>
> >> [1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2
> >>
> >> On 10/27/23 18:33, Alexey Romanenko wrote:
> >>
> >> Ahh, ok, I see.
> >>
> >> Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
> >> time” watermark policy, which we can remove later, and create a new fixed 
> >> one.
> >>
> >> PS: It’s recommended to use 
> >> "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” instead of deprecated 
> >> “org.apache.beam.sdk.io.kinesis.KinesisIO” one.
> >>
> >> —
> >> Alexey
> >>
> >> On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:
> >>
> >> No, I'm referring to this [1] policy which has unexpected (and hardly 
> >> avoidable on the user-code side) data loss issues. The problem is that 
> >> assigning timestamps to elements and watermarks is completely decoupled 
> >> and unrelated, which I'd say is a bug.
> >>
> >>   Jan
> >>
> >> [1] 
> >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--
> >>
> >> On 10/27/23 16:51, Alexey Romanenko wrote:
> >>
> >> Why not just to create a custom watermark policy for that? Or you mean to 
> >> make it as a default policy?
> >>
> >> —
> >> Alexey
> >>
> >> On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:
> >>
> >>
> >> Hi,
> >>
> >> when discussing about [1] we found out, that the issue is actually caused 
> >> by processing time watermarks in KinesisIO. Enabling this watermark 
> >> outputs watermarks based on current processing time, _but event t

Re: Processing time watermarks in KinesisIO

2023-10-31 Thread Robert Bradshaw via dev
There are really two cases that make sense:

(1) We read the event timestamps from the kafka records themselves and
have some external knowledge that guarantees (or at least provides a
very good heuristic) about what the timestamps of unread messages
could be in the future to set the watermark. This could possibly
involve knowing that the timestamps in a partition are monotonically
increasing, or somehow have bounded skew.

(2) We use processing time as both the watermark and for setting the
event timestamp on produced messages. From this point on we can safely
reason about the event time.

The current state seems a bit broken if I understand correctly.

On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský  wrote:
>
> I think that instead of deprecating and creating new version, we could 
> leverage the proposed update compatibility flag for this [1]. I still have 
> some doubts if the processing-time watermarking (and event-time assignment) 
> makes sense. Do we have a valid use-case for that? This is actually the 
> removed SYNCHRONIZED_PROCESSING_TIME time domain, which is problematic - 
> restarts of Pipelines causes timestamps to change and hence makes *every* 
> DoFn potentially non-deterministic, which would be unexpected side-effect. 
> This makes me wonder if we should remove this policy altogether (deprecate or 
> use the update compatibility flag, so that the policy throws exception in new 
> version).
>
> The crucial point would be to find a use-case where it is actually helpful to 
> use such policy.
> Any ideas?
>
>  Jan
>
> [1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2
>
> On 10/27/23 18:33, Alexey Romanenko wrote:
>
> Ahh, ok, I see.
>
> Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
> time” watermark policy, which we can remove later, and create a new fixed one.
>
> PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
> instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.
>
> —
> Alexey
>
> On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:
>
> No, I'm referring to this [1] policy which has unexpected (and hardly 
> avoidable on the user-code side) data loss issues. The problem is that 
> assigning timestamps to elements and watermarks is completely decoupled and 
> unrelated, which I'd say is a bug.
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--
>
> On 10/27/23 16:51, Alexey Romanenko wrote:
>
> Why not just to create a custom watermark policy for that? Or you mean to 
> make it as a default policy?
>
> —
> Alexey
>
> On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:
>
>
> Hi,
>
> when discussing about [1] we found out, that the issue is actually caused by 
> processing time watermarks in KinesisIO. Enabling this watermark outputs 
> watermarks based on current processing time, _but event timestamps are 
> derived from ingestion timestamp_. This can cause unbounded lateness when 
> processing backlog. I think this setup is error-prone and will likely cause 
> data loss due to dropped elements. This can be solved in two ways:
>
>  a) deprecate processing time watermarks, or
>
>  b) modify KinesisIO's watermark policy so that is assigns event timestamps 
> as well (the processing-time watermark policy would have to derive event 
> timestamps from processing-time).
>
> I'd prefer option b) , but it might be a breaking change, moreover I'm not 
> sure if I understand the purpose of processing-time watermark policy, it 
> might be essentially ill defined from the beginning, thus it might really be 
> better to remove it completely. There is also a related issue [2].
>
> Any thoughts on this?
>
>  Jan
>
> [1] https://github.com/apache/beam/issues/25975
>
> [2] https://github.com/apache/beam/issues/28760
>
>
>


Re: Streaming update compatibility

2023-10-27 Thread Robert Bradshaw via dev
On Fri, Oct 27, 2023 at 7:50 AM Kellen Dye via dev  wrote:
>
> > Auto is hard, because it would involve
> > querying the runner before pipeline construction, and we may not even
> > know what the runner is at this point
>
> At the point where pipeline construction will start, you should have access 
> to the pipeline arguments and be able to determine the runner. What seems to 
> be missing is a place to query the runner pre-construction. If that query 
> could return metadata about the currently running version of the job, then 
> that could be incorporated into graph construction as necessary.

While this is the common case, it is not true in general. For example
it's possible to cache the pipeline proto and submit it to a separate
choice of runner later. We have Jobs API implementations that
forward/proxy the job to other runners, and the Python interactive
runner is another example where the runner is late-binding (e.g. one
tries a sample locally, and if all looks good can execute remotely,
and also in this case the graph that's submitted is often mutated
before running).

Also, in the spirit of the portability story, the pipeline definition
itself should be runner-independent.

> That same hook could be a place to for example return the currently-running 
> job graph for pre-submission compatibility checks.

I suppose we could add something to the Jobs API to make "looking up a
previous version of this pipeline" runner-agnostic, though that
assumes it's available at construction time. And +1 as Kellen says we
should define (and be able to check) what pipeline compatibility means
in a via graph-to-graph comparison at the Beam level. I'll defer both
of these as future work as part of the "make update a portable Beam
concept" project.


Re: Streaming update compatibility

2023-10-26 Thread Robert Bradshaw via dev
On Thu, Oct 26, 2023 at 3:59 AM Johanna Öjeling  wrote:
>
> Hi,
>
> I like this idea of making it easier to push out improvements, and had a look 
> at the PR.
>
> One question to better understand how it works today:
>
> The upgrades that the runners do, such as those not visible to the user, can 
> they be initiated at any time or do they only happen in relation to that the 
> user updates the running pipeline e.g. with new user code?

Correct. We're talking about user-initiated changes to their pipeline here.

> And, assuming the former, some reflections that came to mind when reviewing 
> the changes:
>
> Will the update_compatibility_version option be effective both when creating 
> and updating a pipeline? It is grouped with the update options in the Python 
> SDK, but users may want to configure the compatibility already when launching 
> the pipeline.

It will be effective for both, though generally there's little
motivation to not always use the "latest" version when creating a new
pipeline.

> Would it be possible to revert setting a fixed prior version, i.e. 
> (re-)enable upgrades?

The contract would be IF you start with version X (which logically
defaults to the current SDK), THEN all updates also setting this to
version X (even on SDKs > X) should work.

> If yes: in practice, would this motivate another option, or passing a value 
> like "auto" or "latest" to update_compatibility_version?

Unset is interpreted as latest. Auto is hard, because it would involve
querying the runner before pipeline construction, and we may not even
know what the runner is at this point. (Eventually we could do things
like embed both alternative into the graph and let the runner choose,
but this is more speculative and may not be as scalable.)

> The option is being introduced to the Java and Python SDKs. Should this also 
> be applicable to the Go SDK?

Yes, allowing setting this value should be done for Go (and
typescript, and future SDKs) too. As Robert Burke mentioned, we need
to respect the value in those SDKs that have expansion service
implementations first.

> On Thu, Oct 26, 2023 at 2:25 AM Robert Bradshaw via dev  
> wrote:
>>
>> Dataflow (among other runners) has the ability to "upgrade" running
>> pipelines with new code (e.g. capturing bug fixes, dependency updates,
>> and limited topology changes). Unfortunately some improvements (e.g.
>> new and improved ways of writing to BigQuery, optimized use of side
>> inputs, a change in algorithm, sometimes completely internally and not
>> visible to the user) are not sufficiently backwards compatible which
>> causes us, with the motivation to not break users, to either not make
>> these changes or guard them as a parallel opt-in mode which is a
>> significant drain on both developer productivity and causes new
>> pipelines to run in obsolete modes by default.
>>
>> I created https://github.com/apache/beam/pull/29140 which adds a new
>> pipeline option, update_compatibility_version, that allows the SDK to
>> move forward while letting users with pipelines launched previously to
>> manually request the "old" way of doing things to preserve update
>> compatibility. (We should still attempt backwards compatibility when
>> it makes sense, and the old way would remain in code until such a time
>> it's actually deprecated and removed, but this means we won't be
>> constrained by it, especially when it comes to default settings.)
>>
>> Any objections or other thoughts on this approach?
>>
>> - Robert
>>
>> P.S. Separately I think it'd be valuable to elevate the vague notion
>> of update compatibility to a first-class Beam concept and put it on
>> firm footing, but that's a larger conversation outside the thread of
>> this smaller (and I think still useful in such a future world) change.


Streaming update compatibility

2023-10-25 Thread Robert Bradshaw via dev
Dataflow (among other runners) has the ability to "upgrade" running
pipelines with new code (e.g. capturing bug fixes, dependency updates,
and limited topology changes). Unfortunately some improvements (e.g.
new and improved ways of writing to BigQuery, optimized use of side
inputs, a change in algorithm, sometimes completely internally and not
visible to the user) are not sufficiently backwards compatible which
causes us, with the motivation to not break users, to either not make
these changes or guard them as a parallel opt-in mode which is a
significant drain on both developer productivity and causes new
pipelines to run in obsolete modes by default.

I created https://github.com/apache/beam/pull/29140 which adds a new
pipeline option, update_compatibility_version, that allows the SDK to
move forward while letting users with pipelines launched previously to
manually request the "old" way of doing things to preserve update
compatibility. (We should still attempt backwards compatibility when
it makes sense, and the old way would remain in code until such a time
it's actually deprecated and removed, but this means we won't be
constrained by it, especially when it comes to default settings.)

Any objections or other thoughts on this approach?

- Robert

P.S. Separately I think it'd be valuable to elevate the vague notion
of update compatibility to a first-class Beam concept and put it on
firm footing, but that's a larger conversation outside the thread of
this smaller (and I think still useful in such a future world) change.


Re: [Discuss] Idea to increase RC voting participation

2023-10-24 Thread Robert Bradshaw via dev
On Tue, Oct 24, 2023 at 10:35 AM Kenneth Knowles  wrote:

> Tangentially related:
>
> Long ago, attaching an issue to a release was a mandatory step as part of
> closing. Now I think it is not. Is it automatically happening? It looks
> like we have 820 with no milestone
> https://github.com/apache/beam/issues?q=is%3Aissue+no%3Amilestone+is%3Aclosed
>

This could (should) be automatically discoverable. A (closed) issues is
associated with commits which are associated with a release.


> On Tue, Oct 24, 2023 at 1:25 PM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> +1 for going by the commits since this is what matters at the end of the
>> day. Also, many issues may not get tagged correctly for a given release due
>> to either the contributor not tagging the issue or due to commits for the
>> issue spanning multiple Beam releases.
>>
>> For example,
>>
>> For all commits in a given release RC:
>>   * If we find a Github issue for the commit: add a notice to the Github
>> issue
>>   * Else: add the notice to a generic issue for the release including
>> tags for the commit ID, PR author, and the committer who merged the PR.
>>
>> Thanks,
>> Cham
>>
>>
>>
>>
>> On Mon, Oct 23, 2023 at 11:49 AM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> I'd probably vote to include both the issue filer and the contributor.
>>> It is pretty equally straightforward - one way to do this would be using
>>> all issues related to that release's milestone and extracting the issue
>>> author and the issue closer.
>>>
>>> This does leave out the (unfortunately sizable) set of contributions
>>> that don't have an associated issue; if we're worried about that, we could
>>> always fall back to anyone with a commit in the last release who doesn't
>>> have an associated issue (aka what I thought we were initially proposing
>>> and what I think Airflow does today).
>>>
>>> I'm pretty much +1 on any sort of automation here, and it certainly can
>>> come in stages :)
>>>
>>> On Mon, Oct 23, 2023 at 1:50 PM Johanna Öjeling via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Yes that's a good point to include also those who created the issue.
>>>>
>>>> On Mon, Oct 23, 2023, 19:18 Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> On Mon, Oct 23, 2023 at 7:26 AM Danny McCormick via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> So to summarize, I think there's broad consensus (or at least lazy
>>>>>> consensus) around the following:
>>>>>>
>>>>>> - (1) Updating our release email/guidelines to be more specific about
>>>>>> what we mean by release validation/how to be helpful during this process.
>>>>>> This includes both encouraging validation within each user's own code 
>>>>>> base
>>>>>> and encouraging people to document/share their process of validation and
>>>>>> link it in the release spreadsheet.
>>>>>> - (2) Doing something like what Airflow does (#29424
>>>>>> <https://github.com/apache/airflow/issues/29424>) and creating an
>>>>>> issue asking people who have contributed to the current release to help
>>>>>> validate their changes.
>>>>>>
>>>>>> I'm also +1 on doing both of these. The first bit (updating our
>>>>>> guidelines) is relatively easy - it should just require updating
>>>>>> https://github.com/apache/beam/blob/master/contributor-docs/release-guide.md#vote-and-validate-the-release-candidate
>>>>>> .
>>>>>>
>>>>>> I took a look at the second piece (copying what Airflow does) to see
>>>>>> if we could just copy their automation, but it looks like it's tied
>>>>>> to airflow breeze
>>>>>> <https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/provider_issue_TEMPLATE.md.jinja2>
>>>>>> (their repo-specific automation tooling), so we'd probably need to build
>>>>>> the automation ourselves. It shouldn't be terrible, basically we'd want a
>>>>>> GitHub Action that compares the current release tag with the last release
>>>>>> tag, grabs all the commits in

Re: [Discuss] Idea to increase RC voting participation

2023-10-23 Thread Robert Bradshaw via dev
On Mon, Oct 23, 2023 at 7:26 AM Danny McCormick via dev 
wrote:

> So to summarize, I think there's broad consensus (or at least lazy
> consensus) around the following:
>
> - (1) Updating our release email/guidelines to be more specific about what
> we mean by release validation/how to be helpful during this process. This
> includes both encouraging validation within each user's own code base and
> encouraging people to document/share their process of validation and link
> it in the release spreadsheet.
> - (2) Doing something like what Airflow does (#29424
> <https://github.com/apache/airflow/issues/29424>) and creating an issue
> asking people who have contributed to the current release to help validate
> their changes.
>
> I'm also +1 on doing both of these. The first bit (updating our
> guidelines) is relatively easy - it should just require updating
> https://github.com/apache/beam/blob/master/contributor-docs/release-guide.md#vote-and-validate-the-release-candidate
> .
>
> I took a look at the second piece (copying what Airflow does) to see if we
> could just copy their automation, but it looks like it's tied to airflow
> breeze
> <https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/provider_issue_TEMPLATE.md.jinja2>
> (their repo-specific automation tooling), so we'd probably need to build
> the automation ourselves. It shouldn't be terrible, basically we'd want a
> GitHub Action that compares the current release tag with the last release
> tag, grabs all the commits in between, parses them to get the author, and
> creates an issue with that data, but it does represent more effort than
> just updating a markdown file. There might even be an existing Action that
> can help with this, I haven't looked too hard.
>

I was thinking along the lines of a script that would scrape the issues
resolved in a given release and add a comment to them noting that the
change is in release N and encouraging (with clear instructions) how this
can be validated. Creating a "validate this release" issue with all
"contributing" participants could be an interesting way to do this as well.
(I think it'd be valuable to get those who filed the issue, not just those
who fixed it, to validate.)


> As our next release manager, I'm happy to review PRs for either of these
> if anyone wants to volunteer to help out. If not, I'm happy to update the
> guidelines, but I probably won't have time to add the commit inspection
> tooling (I'm planning on throwing any extra time towards continuing to
> automate release candidate creation which is currently a more impactful
> problem IMO). I would very much like it if both of these things happened
> though :)
>
> Thanks,
> Danny
>
> On Mon, Oct 23, 2023 at 10:05 AM XQ Hu  wrote:
>
>> +1. This is a great idea to try. @Danny McCormick
>>  FYI as our next release manager.
>>
>> On Wed, Oct 18, 2023 at 2:30 PM Johanna Öjeling via dev <
>> dev@beam.apache.org> wrote:
>>
>>> When I have contributed to Apache Airflow, they have tagged all
>>> contributors concerned in a GitHub issue when the RC is available and asked
>>> us to validate it. Example: #29424
>>> <https://github.com/apache/airflow/issues/29424>.
>>>
>>> I found that to be an effective way to notify contributors of the RC and
>>> nudge them to help out. In the issue description there is a reference to
>>> the guidelines on how to test the RC and a note that people are encouraged
>>> to vote on the mailing list (which could admittedly be more highlighted
>>> because I did not pay attention to it until now and was unaware that
>>> contributors had a vote).
>>>
>>> It might be an idea to consider something similar here to increase the
>>> participation?
>>>
>>> On Tue, Oct 17, 2023 at 7:01 PM Jack McCluskey via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> I'm +1 on helping explain what we mean by "validate the RC" since we're
>>>> really just asking users to see if their existing use cases work along with
>>>> our typical slate of tests. I don't know if offloading that work to our
>>>> active validators is the right approach though, documentation/screen share
>>>> of their specific workflow is definitely less useful than having a more
>>>> general outline of how to install the RC and things to look out for when
>>>> testing.
>>>>
>>>> On Tue, Oct 17, 2023 at 12:55 PM Austin Bennett 
>>>> wrote:
>>>>
>>>>> Great

Re: [QUESTION] Why no auto labels?

2023-10-20 Thread Robert Bradshaw via dev
I'll take another look at the PR. My inclination is still to use uuids
to uniquify. I think that's worth the cost to the readability hit (I'm
OK reducing this down to 6-8 hex digits which will still give very low
chances of collisions, though it doesn't solve the first one). If
someone cares about names more than this they can set them manually.

On Fri, Oct 20, 2023 at 9:30 AM Joey Tran  wrote:
>
> Just want to bump this discussion again. I'm introducing Beam to other 
> developers at my Schrodinger now and the first (of hopefully many!) developer 
> has started migrating our internal workflows to Beam. As I suspected though, 
> he's complained about the iteration cycles spent from using the same 
> transform without specifying a label and from using multiple assert_that's 
> without unique labels.
>
> From the java code[1], it looks like the same naming scheme is used (I think, 
> it's been a decade since I've read java so apologies if I'm misreading) as 
> the PR I posted
>
> [1] 
> https://github.com/apache/beam/blob/e7a6405800a83dd16437b8b1b372e020e010a042/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java#L630
>
> On Fri, Oct 13, 2023 at 1:32 PM Joey Tran  wrote:
>>
>>
>>
>> On Fri, Oct 13, 2023 at 1:18 PM Robert Bradshaw  wrote:
>>>
>>> On Fri, Oct 13, 2023 at 10:08 AM Joey Tran  
>>> wrote:
>>>>
>>>> Are there places on the SDK side that expect unique labels? Or in 
>>>> non-updateable runners?
>>>
>>>
>>> That's a good question. The label eventually ends up here: 
>>> https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L142
>>>  which is technically a violation of the spec if these are not unique 
>>> (though I guess Java already does this?). More importantly, though, the id 
>>> of the transform (the proto contains a map of strings -> transforms) must 
>>> be unique.
>>>
>>
>> Doesn't that suggest not raising an exception won't be a sufficient enough 
>> solution? But I suppose if the Java SDK does this already it's fine..?
>>
>>>
>>> Another option is to make the suffix a uuid rather than a single counter. 
>>> (This would still have issues with the first application possibly getting 
>>> mixed up with a "different" first application unless it was always 
>>> appended.)
>>>
>>
>> Is the marginal benefit (only the first application may be confused instead 
>> of possibly other applications) valuable enough to justify the marginal cost 
>> of the readability hit of adding uuids to the labels?
>>
>>
>>
>>
>>>> On Fri, Oct 13, 2023 at 12:52 PM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>>
>>>>> Thanks for the PR.
>>>>>
>>>>> I think we should follow Java and allow non-unique labels, but not 
>>>>> provide automatic uniquification, In particular, the danger of using a 
>>>>> counter is that one can get accidental (and potentially hard to check) 
>>>>> off-by-one collisions. As a concrete example, imagine one partitions a 
>>>>> dataset into two collections, each followed by a similarly-named 
>>>>> transform.
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A
>>>>>  \
>>>>>--> B
>>>>>
>>>>> Uniquification would give something like
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A
>>>>>  \
>>>>>--> B_2
>>>>>
>>>>> Suppose one then realizes there's a third case to handle, giving
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A --> B
>>>>>  \
>>>>>--> B
>>>>>
>>>>> But this would be uniquified to
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A --> B_2
>>>>>  \
>>>>>--> B_3
>>>>>
>>>>> where the old B_2 got renamed to B_3 and a new B_2 got put in its place. 
>>>>> This is bad because an updating runner would then attribute old B_2's 
>>>>> state to the new B_2 (and also possibly mis-direct any inflight 
>>>>> messages). At least with the old, intersecting names we can detect this 
>>

Re: [YAML] Aggregations

2023-10-20 Thread Robert Bradshaw via dev
On Fri, Oct 20, 2023 at 11:35 AM Kenneth Knowles  wrote:
>
> A couple other bits on having an expression language:
>
>  - You already have Python lambdas at places, right? so that's quite a lot 
> more complex than SQL project/aggregate expressions
>  - It really does save a lot of pain for users (at the cost of implementation 
> complexity) when you need to "SUM(col1*col2)" where otherwise you have to Map 
> first. This could be viewed as desirable as well, of course.
>
> Anyhow I'm pretty much in agreement with all your reasoning as to why *not* 
> to use SQL-like expressions in strings. But it does seem odd when juxtaposed 
> with Python snippets.

Well, we say "here's a Python expression" when we're using a Python
string. But "SUM(col1*col2)" isn't as transparent. (Agree about the
niceties of being able to provide an expression rather than a column.)

> On Thu, Oct 19, 2023 at 4:00 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Thu, Oct 19, 2023 at 12:53 PM Reuven Lax  wrote:
>> >
>> > Is the schema Group transform (in Java) something along these lines?
>>
>> Yes, for sure it is. It (and Python's and Typescript's equivalent) are
>> linked in the original post. The open question is how to best express
>> this in YAML.
>>
>> > On Wed, Oct 18, 2023 at 1:11 PM Robert Bradshaw via dev 
>> >  wrote:
>> >>
>> >> Beam Yaml has good support for IOs and mappings, but one key missing
>> >> feature for even writing a WordCount is the ability to do Aggregations
>> >> [1]. While the traditional Beam primitive is GroupByKey (and
>> >> CombineValues), we're eschewing KVs in the notion of more schema'd
>> >> data (which has some precedence in our other languages, see the links
>> >> below). The key components the user needs to specify are (1) the key
>> >> fields on which the grouping will take place, (2) the fields
>> >> (expressions?) involved in the aggregation, and (3) what aggregating
>> >> fn to use.
>> >>
>> >> A straw-man example could be something like
>> >>
>> >> type: Aggregating
>> >> config:
>> >>   key: [field1, field2]
>> >>   aggregating:
>> >> total_cost:
>> >>   fn: sum
>> >>   value: cost
>> >> max_cost:
>> >>   fn: max
>> >>   value: cost
>> >>
>> >> This would basically correspond to the SQL expression
>> >>
>> >> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> >> from table GROUP BY field1, field2"
>> >>
>> >> (though I'm not requiring that we use this as an implementation
>> >> strategy). I do not think we need a separate (non aggregating)
>> >> Grouping operation, this can be accomplished by having a concat-style
>> >> combiner.
>> >>
>> >> There are still some open questions here, notably around how to
>> >> specify the aggregation fns themselves. We could of course provide a
>> >> number of built-ins (like SQL does). This gets into the question of
>> >> how and where to document this complete set, but some basics should
>> >> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> >> quantiles); where do we put the parameters? We could go with something
>> >> like
>> >>
>> >> fn:
>> >>   type: ApproximateQuantiles
>> >>   config:
>> >> n: 10
>> >>
>> >> but others are even configured by functions themselves (e.g. LargestN
>> >> that wants a comparator Fn). Maybe we decide not to support these
>> >> (yet?)
>> >>
>> >> One thing I think we should support, however, is referencing custom
>> >> CombineFns. We have some precedent for this with our Fns from
>> >> MapToFields, where we accept things like inline lambdas and external
>> >> references. Again the topic of how to configure them comes up, as
>> >> these custom Fns are more likely to be parameterized than Map Fns
>> >> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> >> MapFns as well). Maybe we allow
>> >>
>> >> language: python. # like MapToFields (and here it'd be harder to mix
>> >> and match per Fn)
>> >> fn:
>> >>   type: ???
>> >>   # should these be nested as config?
>> >>   name: fully.qualiied.name
>> >>   path: /path/to/defining/file
>> >>   args: [...]
>> >>   kwargs: {...}
>> >>
>> >> which would invoke the constructor.
>> >>
>> >> I'm also open to other ways of naming/structuring these essential
>> >> parameters if it makes things more clear.
>> >>
>> >> - Robert
>> >>
>> >>
>> >> Java: 
>> >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> >> Python: 
>> >> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> >> Typescript: 
>> >> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>> >>
>> >> [1] One can of course use SqlTransform for this, but I'm leaning
>> >> towards offering something more native.


Re: [Discuss] Idea to increase RC voting participation

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 12:18 PM Kenneth Knowles  wrote:

> +1 to more helpful guide on "how to usefully participate in RC validation"
> but also big +1 to Robert, Jack, Johanna.
>
> TL;DR the RC validation is an opportunity for downstream testing.
>
> Robert alluded to the origin of the spreadsheet: I created it long ago to
> validate that the human language on our web page actually works. Maybe
> someone should automate that with an LLM now.
>
> Robert also alluded to clean environment: our gradle scripts and GHA
> scripts and CI environment are heavily enough engineered that they don't
> represent what a user will experience. We could potentially use our starter
> repos for an adequate smoke test here.
>
> Those are both ways that *we* can pretend to be users. But actual users
> checking the RC to make sure they'll have a smooth upgrade is by far the
> most impactful validation.
>
> This thread honestly makes me want to delete the spreadsheet but maybe
> come up with a guide for downstream projects to validate against an RC.
> Maybe that's an extreme reaction...
>

I would very much be in favor of that.

On Wed, Oct 18, 2023 at 2:32 PM Robert Bradshaw via dev 
> wrote:
>
>> +1 That's a great idea. They have incentive to make sure the issue was
>> resolved for them, plus we get to ensure there were no other regressions.
>>
>> On Wed, Oct 18, 2023 at 11:30 AM Johanna Öjeling via dev <
>> dev@beam.apache.org> wrote:
>>
>>> When I have contributed to Apache Airflow, they have tagged all
>>> contributors concerned in a GitHub issue when the RC is available and asked
>>> us to validate it. Example: #29424
>>> <https://github.com/apache/airflow/issues/29424>.
>>>
>>> I found that to be an effective way to notify contributors of the RC and
>>> nudge them to help out. In the issue description there is a reference to
>>> the guidelines on how to test the RC and a note that people are encouraged
>>> to vote on the mailing list (which could admittedly be more highlighted
>>> because I did not pay attention to it until now and was unaware that
>>> contributors had a vote).
>>>
>>> It might be an idea to consider something similar here to increase the
>>> participation?
>>>
>>> On Tue, Oct 17, 2023 at 7:01 PM Jack McCluskey via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> I'm +1 on helping explain what we mean by "validate the RC" since we're
>>>> really just asking users to see if their existing use cases work along with
>>>> our typical slate of tests. I don't know if offloading that work to our
>>>> active validators is the right approach though, documentation/screen share
>>>> of their specific workflow is definitely less useful than having a more
>>>> general outline of how to install the RC and things to look out for when
>>>> testing.
>>>>
>>>> On Tue, Oct 17, 2023 at 12:55 PM Austin Bennett 
>>>> wrote:
>>>>
>>>>> Great effort.  I'm also interested in streamlining releases -- so if
>>>>> there are alot of manual tests that could be automated, would be great
>>>>> to discover and then look to address.
>>>>>
>>>>> On Tue, Oct 17, 2023 at 8:47 AM Robert Bradshaw via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> +1
>>>>>>
>>>>>> I would also strongly suggest that people try out the release against
>>>>>> their own codebases. This has the benefit of ensuring the release won't
>>>>>> break your own code when they go out, and stress-tests the new code 
>>>>>> against
>>>>>> real-world pipelines. (Ideally our own tests are all passing, and this
>>>>>> validation is automated as much as possible (though ensuring it matches 
>>>>>> our
>>>>>> documentation and works in a clean environment still has value), but
>>>>>> there's a lot of code and uses out there that we don't have access to
>>>>>> during normal Beam development.)
>>>>>>
>>>>>> On Tue, Oct 17, 2023 at 8:21 AM Svetak Sundhar via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I’ve participated in RC testing for a few releases and have observed
>>>>>>> 

Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 12:53 PM Reuven Lax  wrote:
>
> Is the schema Group transform (in Java) something along these lines?

Yes, for sure it is. It (and Python's and Typescript's equivalent) are
linked in the original post. The open question is how to best express
this in YAML.

> On Wed, Oct 18, 2023 at 1:11 PM Robert Bradshaw via dev  
> wrote:
>>
>> Beam Yaml has good support for IOs and mappings, but one key missing
>> feature for even writing a WordCount is the ability to do Aggregations
>> [1]. While the traditional Beam primitive is GroupByKey (and
>> CombineValues), we're eschewing KVs in the notion of more schema'd
>> data (which has some precedence in our other languages, see the links
>> below). The key components the user needs to specify are (1) the key
>> fields on which the grouping will take place, (2) the fields
>> (expressions?) involved in the aggregation, and (3) what aggregating
>> fn to use.
>>
>> A straw-man example could be something like
>>
>> type: Aggregating
>> config:
>>   key: [field1, field2]
>>   aggregating:
>> total_cost:
>>   fn: sum
>>   value: cost
>> max_cost:
>>   fn: max
>>   value: cost
>>
>> This would basically correspond to the SQL expression
>>
>> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> from table GROUP BY field1, field2"
>>
>> (though I'm not requiring that we use this as an implementation
>> strategy). I do not think we need a separate (non aggregating)
>> Grouping operation, this can be accomplished by having a concat-style
>> combiner.
>>
>> There are still some open questions here, notably around how to
>> specify the aggregation fns themselves. We could of course provide a
>> number of built-ins (like SQL does). This gets into the question of
>> how and where to document this complete set, but some basics should
>> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> quantiles); where do we put the parameters? We could go with something
>> like
>>
>> fn:
>>   type: ApproximateQuantiles
>>   config:
>> n: 10
>>
>> but others are even configured by functions themselves (e.g. LargestN
>> that wants a comparator Fn). Maybe we decide not to support these
>> (yet?)
>>
>> One thing I think we should support, however, is referencing custom
>> CombineFns. We have some precedent for this with our Fns from
>> MapToFields, where we accept things like inline lambdas and external
>> references. Again the topic of how to configure them comes up, as
>> these custom Fns are more likely to be parameterized than Map Fns
>> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> MapFns as well). Maybe we allow
>>
>> language: python. # like MapToFields (and here it'd be harder to mix
>> and match per Fn)
>> fn:
>>   type: ???
>>   # should these be nested as config?
>>   name: fully.qualiied.name
>>   path: /path/to/defining/file
>>   args: [...]
>>   kwargs: {...}
>>
>> which would invoke the constructor.
>>
>> I'm also open to other ways of naming/structuring these essential
>> parameters if it makes things more clear.
>>
>> - Robert
>>
>>
>> Java: 
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> Python: 
>> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> Typescript: 
>> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>>
>> [1] One can of course use SqlTransform for this, but I'm leaning
>> towards offering something more native.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 11:12 AM Kenneth Knowles  wrote:
>
> Using SQL expressions in strings is maybe OK given we are all
> relational all the time. Either way you have to define what the
> universe of `fn` is. Here's a compact possibility:
>
> type: Combine
> config:
>   group_by: [field1, field2]
>   aggregates:
> max_cost: "MAX(cost)"
> total_cost: "SUM(cost)"
>
> Just a thought to get it closer to SQL concision.

So I'm a bit wary of having to parse these strings (unless a language
parameter is passed in which case we defer to that language's syntax).
It's also messy the other way around, if a tool generates the YAML I'd
rather it not have to generate strings like this (i.e. string literals
should either be identifiers or opaque blobs).

Pandas achieves consciousness by allowing one to just specify, say,
'sum' and implicitly summing over all (numeric) fields, and allowing
more verbose, precise specification as well.

> I also used the word
> "Combine" just to connect it to other Beam writings and whatnot.

+1

> On Thu, Oct 19, 2023 at 1:41 PM Robert Bradshaw via dev
>  wrote:
> >
> > On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:
> > >
> > > On 10/19/23 18:28, Robert Bradshaw via dev wrote:
> > > > On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  
> > > > wrote:
> > > >> Rill is definitely SQL-oriented but I think that's going to be the 
> > > >> most common. Dataframes are explicitly modeled on the relational 
> > > >> approach so that's going to look a lot like SQL,
> > > > I think pretty much any approach that fits here is going to be
> > > > relational, meaning you choose a set of columns to group on, a set of
> > > > columns to aggregate, and how to aggregate. The big open question is
> > > > what syntax to use for the "how."
> > > This might be already answered, if so, pardon my ignorance, but what is
> > > the goal this declarative approach is trying to solve? Is it meant to be
> > > more expressive or equally expressive than SQL? And if more, how much 
> > > more?
> >
> > I'm not sure if you're asking about YAML in general, or the particular
> > case of aggregation, but I can answer both.
> >
> > For the larger Beam YAML project, it's trying to solve the problem
> > that SQL is (and I'll admit this is somewhat subjective here) good at
> > expressing the T part of ETL, but not the other parts. For example,
> > the simple data movent usecase of (say) reading from PubSub and
> > dumping into BigQuery is not well expressed in terms of SQL. SQL is
> > also fairly awkward when it comes to defining UDFs and TDFs and
> > non-linear pipelines (especially those with fanout). There are of
> > course other tools in this space (dbt comes to mind, and there's been
> > some investigation on how to make dbt play well with Beam). The other
> > niche it is trying to solve is that installing and learning a full SDK
> > is heavyweight and overkill for creating pipelines that are simply
> > wiring together pre-defined transforms.
> >
> > As for the more narrow case of aggregations, I think being similarly
> > expressive as SQL is fine, though it'd be good to make custom UADFs
> > more natural. Originally I was thinking that just having SqlTransform
> > might be sufficient, but it feels like a big hammer to reach for every
> > time I just want to sum over one or two columns.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 11:42 AM Jan Lukavský  wrote:
>
> On 10/19/23 19:41, Robert Bradshaw via dev wrote:
> > On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:
> >> On 10/19/23 18:28, Robert Bradshaw via dev wrote:
> >>> On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:
> >>>> Rill is definitely SQL-oriented but I think that's going to be the most 
> >>>> common. Dataframes are explicitly modeled on the relational approach so 
> >>>> that's going to look a lot like SQL,
> >>> I think pretty much any approach that fits here is going to be
> >>> relational, meaning you choose a set of columns to group on, a set of
> >>> columns to aggregate, and how to aggregate. The big open question is
> >>> what syntax to use for the "how."
> >> This might be already answered, if so, pardon my ignorance, but what is
> >> the goal this declarative approach is trying to solve? Is it meant to be
> >> more expressive or equally expressive than SQL? And if more, how much more?
> > I'm not sure if you're asking about YAML in general, or the particular
> > case of aggregation, but I can answer both.
> >
> > For the larger Beam YAML project, it's trying to solve the problem
> > that SQL is (and I'll admit this is somewhat subjective here) good at
> > expressing the T part of ETL, but not the other parts. For example,
> > the simple data movent usecase of (say) reading from PubSub and
> > dumping into BigQuery is not well expressed in terms of SQL. SQL is
> > also fairly awkward when it comes to defining UDFs and TDFs and
> > non-linear pipelines (especially those with fanout). There are of
> > course other tools in this space (dbt comes to mind, and there's been
> > some investigation on how to make dbt play well with Beam). The other
> > niche it is trying to solve is that installing and learning a full SDK
> > is heavyweight and overkill for creating pipelines that are simply
> > wiring together pre-defined transforms.
>
> I think FlinkSQL solves the problem of E and L in SQL via CREATE TABLE
> and INSERT statements. I agree with the fanout part, though it could be
> possible to use CREATE (TEMPORARY) TABLE AS SELECT ... could solve that
> as well.

Yeah, Beam uses the CREATE TABLE for referring to external data too.
(I don't remember about INSERT statements). This is where (IMHO of
course) SQL starts to get very messy (and non-standard).

> > As for the more narrow case of aggregations, I think being similarly
> > expressive as SQL is fine, though it'd be good to make custom UADFs
> > more natural. Originally I was thinking that just having SqlTransform
> > might be sufficient, but it feels like a big hammer to reach for every
> > time I just want to sum over one or two columns.
>
> Yes, defining UDFs and UDAFs is painful, that was the motivation of my
> question. It also defines how the syntax for such UDAF would need to
> look like. It would require to break UDAFs down to several primitive
> UDFs and then use a functional style to declare them. Most of the time
> it would be probably sufficient to use simplified CombineFn semantics
> with accumulator being limited to a primitive type (long, double,
> string, maybe array?). I suppose declaring a full-blown stateful DoFn
> (timers, generic state, ...) is out of scope.

Other than the possible totally-commutative-associative V* -> V case,
I'm probably fine with referencing existing CombineFns (including ones
defined in files like we do with mapping fns) rather than providing a
full YAML syntax for defining them. But we do need to be able to
parameterize them.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:
>
> On 10/19/23 18:28, Robert Bradshaw via dev wrote:
> > On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:
> >> Rill is definitely SQL-oriented but I think that's going to be the most 
> >> common. Dataframes are explicitly modeled on the relational approach so 
> >> that's going to look a lot like SQL,
> > I think pretty much any approach that fits here is going to be
> > relational, meaning you choose a set of columns to group on, a set of
> > columns to aggregate, and how to aggregate. The big open question is
> > what syntax to use for the "how."
> This might be already answered, if so, pardon my ignorance, but what is
> the goal this declarative approach is trying to solve? Is it meant to be
> more expressive or equally expressive than SQL? And if more, how much more?

I'm not sure if you're asking about YAML in general, or the particular
case of aggregation, but I can answer both.

For the larger Beam YAML project, it's trying to solve the problem
that SQL is (and I'll admit this is somewhat subjective here) good at
expressing the T part of ETL, but not the other parts. For example,
the simple data movent usecase of (say) reading from PubSub and
dumping into BigQuery is not well expressed in terms of SQL. SQL is
also fairly awkward when it comes to defining UDFs and TDFs and
non-linear pipelines (especially those with fanout). There are of
course other tools in this space (dbt comes to mind, and there's been
some investigation on how to make dbt play well with Beam). The other
niche it is trying to solve is that installing and learning a full SDK
is heavyweight and overkill for creating pipelines that are simply
wiring together pre-defined transforms.

As for the more narrow case of aggregations, I think being similarly
expressive as SQL is fine, though it'd be good to make custom UADFs
more natural. Originally I was thinking that just having SqlTransform
might be sufficient, but it feels like a big hammer to reach for every
time I just want to sum over one or two columns.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:
>
> Rill is definitely SQL-oriented but I think that's going to be the most 
> common. Dataframes are explicitly modeled on the relational approach so 
> that's going to look a lot like SQL,

I think pretty much any approach that fits here is going to be
relational, meaning you choose a set of columns to group on, a set of
columns to aggregate, and how to aggregate. The big open question is
what syntax to use for the "how."

Dataframe aggregation is probably a good example to look at. Here we
have panda and R in particular as concrete instances. It should also
be easy to support different aggregations over different (or the same)
columns. Pandas can take a list of (or mapping to) functions in its
groupby().agg(). R doesn't seem to make this very easy...

> which leaves us with S-style formulas (which I like but are pretty niche)

I'm curious, what are these?

>  and I guess pivot tables coming from the spreadsheet world. Does make me 
> wonder what Rails' ORM looks like these days (I last used v4), it had some 
> aggregation support and was pretty declarative...
>
> On Wed, Oct 18, 2023 at 6:06 PM Robert Bradshaw  wrote:
>>
>> On Wed, Oct 18, 2023 at 5:06 PM Byron Ellis  wrote:
>> >
>> > Is it worth taking a look at similar prior art in the space?
>>
>> +1. Pointers welcome.
>>
>> > The first one that comes to mind is Transform, but with the dbt labs 
>> > acquisition that spec is a lot harder to find. Rill is pretty similar 
>> > though.
>>
>> Rill seems to be very SQL-based.
>>
>> > On Wed, Oct 18, 2023 at 1:12 PM Robert Bradshaw via dev 
>> >  wrote:
>> >>
>> >> Beam Yaml has good support for IOs and mappings, but one key missing
>> >> feature for even writing a WordCount is the ability to do Aggregations
>> >> [1]. While the traditional Beam primitive is GroupByKey (and
>> >> CombineValues), we're eschewing KVs in the notion of more schema'd
>> >> data (which has some precedence in our other languages, see the links
>> >> below). The key components the user needs to specify are (1) the key
>> >> fields on which the grouping will take place, (2) the fields
>> >> (expressions?) involved in the aggregation, and (3) what aggregating
>> >> fn to use.
>> >>
>> >> A straw-man example could be something like
>> >>
>> >> type: Aggregating
>> >> config:
>> >>   key: [field1, field2]
>> >>   aggregating:
>> >> total_cost:
>> >>   fn: sum
>> >>   value: cost
>> >> max_cost:
>> >>   fn: max
>> >>   value: cost
>> >>
>> >> This would basically correspond to the SQL expression
>> >>
>> >> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> >> from table GROUP BY field1, field2"
>> >>
>> >> (though I'm not requiring that we use this as an implementation
>> >> strategy). I do not think we need a separate (non aggregating)
>> >> Grouping operation, this can be accomplished by having a concat-style
>> >> combiner.
>> >>
>> >> There are still some open questions here, notably around how to
>> >> specify the aggregation fns themselves. We could of course provide a
>> >> number of built-ins (like SQL does). This gets into the question of
>> >> how and where to document this complete set, but some basics should
>> >> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> >> quantiles); where do we put the parameters? We could go with something
>> >> like
>> >>
>> >> fn:
>> >>   type: ApproximateQuantiles
>> >>   config:
>> >> n: 10
>> >>
>> >> but others are even configured by functions themselves (e.g. LargestN
>> >> that wants a comparator Fn). Maybe we decide not to support these
>> >> (yet?)
>> >>
>> >> One thing I think we should support, however, is referencing custom
>> >> CombineFns. We have some precedent for this with our Fns from
>> >> MapToFields, where we accept things like inline lambdas and external
>> >> references. Again the topic of how to configure them comes up, as
>> >> these custom Fns are more likely to be parameterized than Map Fns
>> >> (though, to be clear, perhaps it'd be good to allow parameterizatin o

Re: [YAML] Aggregations

2023-10-18 Thread Robert Bradshaw via dev
On Wed, Oct 18, 2023 at 5:06 PM Byron Ellis  wrote:
>
> Is it worth taking a look at similar prior art in the space?

+1. Pointers welcome.

> The first one that comes to mind is Transform, but with the dbt labs 
> acquisition that spec is a lot harder to find. Rill is pretty similar though.

Rill seems to be very SQL-based.

> On Wed, Oct 18, 2023 at 1:12 PM Robert Bradshaw via dev  
> wrote:
>>
>> Beam Yaml has good support for IOs and mappings, but one key missing
>> feature for even writing a WordCount is the ability to do Aggregations
>> [1]. While the traditional Beam primitive is GroupByKey (and
>> CombineValues), we're eschewing KVs in the notion of more schema'd
>> data (which has some precedence in our other languages, see the links
>> below). The key components the user needs to specify are (1) the key
>> fields on which the grouping will take place, (2) the fields
>> (expressions?) involved in the aggregation, and (3) what aggregating
>> fn to use.
>>
>> A straw-man example could be something like
>>
>> type: Aggregating
>> config:
>>   key: [field1, field2]
>>   aggregating:
>> total_cost:
>>   fn: sum
>>   value: cost
>> max_cost:
>>   fn: max
>>   value: cost
>>
>> This would basically correspond to the SQL expression
>>
>> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> from table GROUP BY field1, field2"
>>
>> (though I'm not requiring that we use this as an implementation
>> strategy). I do not think we need a separate (non aggregating)
>> Grouping operation, this can be accomplished by having a concat-style
>> combiner.
>>
>> There are still some open questions here, notably around how to
>> specify the aggregation fns themselves. We could of course provide a
>> number of built-ins (like SQL does). This gets into the question of
>> how and where to document this complete set, but some basics should
>> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> quantiles); where do we put the parameters? We could go with something
>> like
>>
>> fn:
>>   type: ApproximateQuantiles
>>   config:
>> n: 10
>>
>> but others are even configured by functions themselves (e.g. LargestN
>> that wants a comparator Fn). Maybe we decide not to support these
>> (yet?)
>>
>> One thing I think we should support, however, is referencing custom
>> CombineFns. We have some precedent for this with our Fns from
>> MapToFields, where we accept things like inline lambdas and external
>> references. Again the topic of how to configure them comes up, as
>> these custom Fns are more likely to be parameterized than Map Fns
>> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> MapFns as well). Maybe we allow
>>
>> language: python. # like MapToFields (and here it'd be harder to mix
>> and match per Fn)
>> fn:
>>   type: ???
>>   # should these be nested as config?
>>   name: fully.qualiied.name
>>   path: /path/to/defining/file
>>   args: [...]
>>   kwargs: {...}
>>
>> which would invoke the constructor.
>>
>> I'm also open to other ways of naming/structuring these essential
>> parameters if it makes things more clear.
>>
>> - Robert
>>
>>
>> Java: 
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> Python: 
>> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> Typescript: 
>> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>>
>> [1] One can of course use SqlTransform for this, but I'm leaning
>> towards offering something more native.


[YAML] Aggregations

2023-10-18 Thread Robert Bradshaw via dev
Beam Yaml has good support for IOs and mappings, but one key missing
feature for even writing a WordCount is the ability to do Aggregations
[1]. While the traditional Beam primitive is GroupByKey (and
CombineValues), we're eschewing KVs in the notion of more schema'd
data (which has some precedence in our other languages, see the links
below). The key components the user needs to specify are (1) the key
fields on which the grouping will take place, (2) the fields
(expressions?) involved in the aggregation, and (3) what aggregating
fn to use.

A straw-man example could be something like

type: Aggregating
config:
  key: [field1, field2]
  aggregating:
total_cost:
  fn: sum
  value: cost
max_cost:
  fn: max
  value: cost

This would basically correspond to the SQL expression

"SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
from table GROUP BY field1, field2"

(though I'm not requiring that we use this as an implementation
strategy). I do not think we need a separate (non aggregating)
Grouping operation, this can be accomplished by having a concat-style
combiner.

There are still some open questions here, notably around how to
specify the aggregation fns themselves. We could of course provide a
number of built-ins (like SQL does). This gets into the question of
how and where to document this complete set, but some basics should
take us pretty far. Many aggregators, however, are parameterized (e.g.
quantiles); where do we put the parameters? We could go with something
like

fn:
  type: ApproximateQuantiles
  config:
n: 10

but others are even configured by functions themselves (e.g. LargestN
that wants a comparator Fn). Maybe we decide not to support these
(yet?)

One thing I think we should support, however, is referencing custom
CombineFns. We have some precedent for this with our Fns from
MapToFields, where we accept things like inline lambdas and external
references. Again the topic of how to configure them comes up, as
these custom Fns are more likely to be parameterized than Map Fns
(though, to be clear, perhaps it'd be good to allow parameterizatin of
MapFns as well). Maybe we allow

language: python. # like MapToFields (and here it'd be harder to mix
and match per Fn)
fn:
  type: ???
  # should these be nested as config?
  name: fully.qualiied.name
  path: /path/to/defining/file
  args: [...]
  kwargs: {...}

which would invoke the constructor.

I'm also open to other ways of naming/structuring these essential
parameters if it makes things more clear.

- Robert


Java: 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
Python: 
https://beam.apache.org/documentation/transforms/python/aggregation/groupby
Typescript: 
https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html

[1] One can of course use SqlTransform for this, but I'm leaning
towards offering something more native.


Re: [Discuss] Idea to increase RC voting participation

2023-10-18 Thread Robert Bradshaw via dev
+1 That's a great idea. They have incentive to make sure the issue was
resolved for them, plus we get to ensure there were no other regressions.

On Wed, Oct 18, 2023 at 11:30 AM Johanna Öjeling via dev <
dev@beam.apache.org> wrote:

> When I have contributed to Apache Airflow, they have tagged all
> contributors concerned in a GitHub issue when the RC is available and asked
> us to validate it. Example: #29424
> <https://github.com/apache/airflow/issues/29424>.
>
> I found that to be an effective way to notify contributors of the RC and
> nudge them to help out. In the issue description there is a reference to
> the guidelines on how to test the RC and a note that people are encouraged
> to vote on the mailing list (which could admittedly be more highlighted
> because I did not pay attention to it until now and was unaware that
> contributors had a vote).
>
> It might be an idea to consider something similar here to increase the
> participation?
>
> On Tue, Oct 17, 2023 at 7:01 PM Jack McCluskey via dev <
> dev@beam.apache.org> wrote:
>
>> I'm +1 on helping explain what we mean by "validate the RC" since we're
>> really just asking users to see if their existing use cases work along with
>> our typical slate of tests. I don't know if offloading that work to our
>> active validators is the right approach though, documentation/screen share
>> of their specific workflow is definitely less useful than having a more
>> general outline of how to install the RC and things to look out for when
>> testing.
>>
>> On Tue, Oct 17, 2023 at 12:55 PM Austin Bennett 
>> wrote:
>>
>>> Great effort.  I'm also interested in streamlining releases -- so if
>>> there are alot of manual tests that could be automated, would be great
>>> to discover and then look to address.
>>>
>>> On Tue, Oct 17, 2023 at 8:47 AM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> +1
>>>>
>>>> I would also strongly suggest that people try out the release against
>>>> their own codebases. This has the benefit of ensuring the release won't
>>>> break your own code when they go out, and stress-tests the new code against
>>>> real-world pipelines. (Ideally our own tests are all passing, and this
>>>> validation is automated as much as possible (though ensuring it matches our
>>>> documentation and works in a clean environment still has value), but
>>>> there's a lot of code and uses out there that we don't have access to
>>>> during normal Beam development.)
>>>>
>>>> On Tue, Oct 17, 2023 at 8:21 AM Svetak Sundhar via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I’ve participated in RC testing for a few releases and have observed a
>>>>> bit of a knowledge gap in how releases can be tested. Given that Beam
>>>>> encourages contributors to vote on RC’s regardless of tenure, and that
>>>>> voting on an RC is a relatively low-effort, high leverage way to influence
>>>>> the release of the library, I propose the following:
>>>>>
>>>>> During the vote for the next release, voters can document the process
>>>>> they followed on a separate document, and add the link on column G
>>>>> here
>>>>> <https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=437054928>.
>>>>> One step further, could be a screencast of running the test, and attaching
>>>>> a link of that.
>>>>>
>>>>> We can keep repeating this through releases until we have
>>>>> documentation for many of the different tests. We can then add these docs
>>>>> into the repo.
>>>>>
>>>>> I’m proposing this because I’ve gathered the following feedback from
>>>>> colleagues that are tangentially involved with Beam: They are interested 
>>>>> in
>>>>> participating in release validation, but don’t know how to get started.
>>>>> Happy to hear other suggestions too, if there are any to address the
>>>>> above.
>>>>>
>>>>> Thanks,
>>>>>
>>>>>
>>>>> Svetak Sundhar
>>>>>
>>>>>   Data Engineer
>>>>> s vetaksund...@google.com
>>>>>
>>>>>


Re: [Discuss] Idea to increase RC voting participation

2023-10-17 Thread Robert Bradshaw via dev
+1

I would also strongly suggest that people try out the release against their
own codebases. This has the benefit of ensuring the release won't break
your own code when they go out, and stress-tests the new code against
real-world pipelines. (Ideally our own tests are all passing, and this
validation is automated as much as possible (though ensuring it matches our
documentation and works in a clean environment still has value), but
there's a lot of code and uses out there that we don't have access to
during normal Beam development.)

On Tue, Oct 17, 2023 at 8:21 AM Svetak Sundhar via dev 
wrote:

> Hi all,
>
> I’ve participated in RC testing for a few releases and have observed a bit
> of a knowledge gap in how releases can be tested. Given that Beam
> encourages contributors to vote on RC’s regardless of tenure, and that
> voting on an RC is a relatively low-effort, high leverage way to influence
> the release of the library, I propose the following:
>
> During the vote for the next release, voters can document the process they
> followed on a separate document, and add the link on column G here
> .
> One step further, could be a screencast of running the test, and attaching
> a link of that.
>
> We can keep repeating this through releases until we have documentation
> for many of the different tests. We can then add these docs into the repo.
>
> I’m proposing this because I’ve gathered the following feedback from
> colleagues that are tangentially involved with Beam: They are interested in
> participating in release validation, but don’t know how to get started.
> Happy to hear other suggestions too, if there are any to address the
> above.
>
> Thanks,
>
>
> Svetak Sundhar
>
>   Data Engineer
> s vetaksund...@google.com
>
>


Re: Enable state cache in Python SDK

2023-10-16 Thread Robert Bradshaw via dev
+1, we should definitely be enabling at least some caching by default
here. Added some comments to the doc.

On Mon, Oct 16, 2023 at 9:27 AM Anand Inguva via dev
 wrote:
>
> Hello,
>
> In Python SDK, the user state and side input caching is disabled by default 
> for all the runners except FnAPI direct runner(intended for testing 
> purposes). I would like to propose that we enable the state cache for the 
> Python SDK similar to other SDKs. I created a doc[1] on why we need to do it, 
> advantages and disadvantages along with future improvements.
>
> Please take a look and let me know what you think.
>
> Thanks,
> Anand
>
> [1] 
> https://docs.google.com/document/d/1gllYsIFqKt4TWAxQmXU_-sw7SLnur2Q69d__N0XBMdE/edit
>
>


Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via dev
On Fri, Oct 13, 2023 at 10:08 AM Joey Tran 
wrote:

> That makes sense. Would you suggest the new option simply suppress the
> RuntimeError and use the non-unique label?
>

Yes. (Or, rather, not raise it.)


> Are there places on the SDK side that expect unique labels? Or in
> non-updateable runners?
>

That's a good question. The label eventually ends up here:
https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L142
which is technically a violation of the spec if these are not unique
(though I guess Java already does this?). More importantly, though, the id
of the transform (the proto contains a map of strings -> transforms) must
be unique.


> Would making `--auto_unique_labels` a mutually exclusive option with
> streaming be a reasonable option? Not sure if stream/batch-specific options
> are a good idea or not... but if there's precedent, then it'd be an easy
> option
>

It's not always possible to even tell at pipeline construction whether the
pipeline will be run in streaming mode. (I suppose one could check later?)
It's generally something we try to avoid though.

Another option is to make the suffix a uuid rather than a single counter.
(This would still have issues with the first application possibly getting
mixed up with a "different" first application unless it was always
appended.)


> On Fri, Oct 13, 2023 at 12:52 PM Robert Bradshaw 
> wrote:
>
>>
>> Thanks for the PR.
>>
>> I think we should follow Java and allow non-unique labels, but not
>> provide automatic uniquification, In particular, the danger of using a
>> counter is that one can get accidental (and potentially hard to check)
>> off-by-one collisions. As a concrete example, imagine one partitions a
>> dataset into two collections, each followed by a similarly-named transform.
>>
>> --> B
>>   /
>> A
>>  \
>>--> B
>>
>> Uniquification would give something like
>>
>> --> B
>>   /
>> A
>>  \
>>--> B_2
>>
>> Suppose one then realizes there's a third case to handle, giving
>>
>> --> B
>>   /
>> A --> B
>>  \
>>--> B
>>
>> But this would be uniquified to
>>
>> --> B
>>   /
>> A --> B_2
>>  \
>>--> B_3
>>
>> where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
>> This is bad because an updating runner would then attribute old B_2's state
>> to the new B_2 (and also possibly mis-direct any inflight messages). At
>> least with the old, intersecting names we can detect this problem
>> rather than silently give corrupt data.
>>
>>
>> On Fri, Oct 13, 2023 at 7:15 AM Joey Tran 
>> wrote:
>>
>>> For posterity: https://github.com/apache/beam/pull/28984
>>>
>>> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> I would definitely support a PR making this an option. Changing the
>>>> default would be a rather big change that would require more thought.
>>>>
>>>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>>>> might find inconvenient.
>>>>>
>>>>> If there's a conclusion to make the behavior similar to java, I'm
>>>>> happy to put up a PR
>>>>>
>>>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>>>> be a very sticky toggle since it'd be easy for PTransforms to 
>>>>>> accidentally
>>>>>> rely on it.
>>>>>>
>>>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>>>> wrote:
>>>>>>
>>>>>>> Huh. This used to be a hard error in Java, but I guess it's
>>>>>>> togglable with an option now. We should probably add the option to 
>>>>>>> toggle
>>>>>>> Python too. (Unclear what the default should be, but this probably ties
>>>>>>> into re-thinking how pipeline update should work.)
>>>>>>>
>>>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>

Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via dev
Thanks for the PR.

I think we should follow Java and allow non-unique labels, but not provide
automatic uniquification, In particular, the danger of using a counter is
that one can get accidental (and potentially hard to check) off-by-one
collisions. As a concrete example, imagine one partitions a dataset into
two collections, each followed by a similarly-named transform.

--> B
  /
A
 \
   --> B

Uniquification would give something like

--> B
  /
A
 \
   --> B_2

Suppose one then realizes there's a third case to handle, giving

--> B
  /
A --> B
 \
   --> B

But this would be uniquified to

--> B
  /
A --> B_2
 \
   --> B_3

where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
This is bad because an updating runner would then attribute old B_2's state
to the new B_2 (and also possibly mis-direct any inflight messages). At
least with the old, intersecting names we can detect this problem
rather than silently give corrupt data.


On Fri, Oct 13, 2023 at 7:15 AM Joey Tran  wrote:

> For posterity: https://github.com/apache/beam/pull/28984
>
> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
> wrote:
>
>> I would definitely support a PR making this an option. Changing the
>> default would be a rather big change that would require more thought.
>>
>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>> wrote:
>>
>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>> might find inconvenient.
>>>
>>> If there's a conclusion to make the behavior similar to java, I'm happy
>>> to put up a PR
>>>
>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>> wrote:
>>>
>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>> be a very sticky toggle since it'd be easy for PTransforms to accidentally
>>>> rely on it.
>>>>
>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>>>> with an option now. We should probably add the option to toggle Python 
>>>>> too.
>>>>> (Unclear what the default should be, but this probably ties into
>>>>> re-thinking how pipeline update should work.)
>>>>>
>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Makes sense that the requirement is the same, but is the label
>>>>>> auto-generation behavior the same? I modified the BeamJava
>>>>>> wordcount example[1] to do the regex filter twice in a row, and unlike 
>>>>>> the
>>>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>>>> exception.
>>>>>>
>>>>>> Tangentially, is it expected that the Beam playground examples don't
>>>>>> have a way to see the outputs of a run example? I have a vague memory 
>>>>>> that
>>>>>> there used to be a way to navigate to an output file after it's generated
>>>>>> but not sure if I just dreamt that up. Playing with the examples, I 
>>>>>> wasn't
>>>>>> positive if my runs were actually succeeding or not based on the stdout
>>>>>> alone.
>>>>>>
>>>>>> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
>>>>>> <https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2>
>>>>>> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>>>>>>
>>>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>>>> u...@beam.apache.org> wrote:
>>>>>>
>>>>>>> BeamJava and BeamPython have the exact same behavior:
>>>>>>> transform names within must be distinct [1]. This is because we do not
>>>>>>> necessarily know at pipeline construction time if the pipeline will be
>>>>>>> streaming or batch, or if it will be updated in the future, so the 
>>>>>>> decision
>>>>>>> was made to impose this restriction up front. Both will auto-generate a
>>>>>>> name for you if one is not given, but will do so deterministically (not
>>>>>>> depending on some global context

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-12 Thread Robert Bradshaw via dev
OK, so how about this for a concrete proposal:

sink:
  type: WriteToParquet
  config:
path:
"/beam/filesytem/{record.my_col}-{timestamp.year}{timestamp.month}{timestamp.day}"
suffix: ".parquet"

The eventual path would be . The suffix
would be optional, and there could be a default for the specific file
format. A file format could inspect a provided suffix like ".csv.gz" to
infer compression as well.

Note that this doesn't have any special indicators for being dynamic other
than the {}'s. Also, my_col would be written as part of the data (but we
could add an extra "elide" config parameter that takes a list of columns to
exclude if desired).

We could call this "prefix" rather than path. (Path is symmetric with
reading, but prefix is a bit more direct.) Anyone want to voice
their opinion here?




On Wed, Oct 11, 2023 at 9:01 AM Chamikara Jayalath 
wrote:

>
>
> On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles  wrote:
>
>> So, top-posting because the threading got to be a lot for me and I think
>> it forked a bit too... I may even be restating something someone said, so
>> apologies for that.
>>
>> Very very good point about *required* parameters where if you don't use
>> them then you will end up with two writers writing to the same file. The
>> easiest example to work with might be if you omitted SHARD_NUM so all
>> shards end up clobbering the same file.
>>
>> I think there's a unifying perspective between prefix/suffix and the need
>> to be sure to include critical sharding variables. Essentially it is my
>> point about it being a "big data fileset". It is perhaps unrealistic but
>> ideally the user names the big data fileset and then the mandatory other
>> pieces are added outside of their control. For example if I name my big
>> data fileset "foo" then that implicitly means that "foo" consists of all
>> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I
>> re-read I see you basically said the same thing. In some cases the required
>> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the
>> user can think of it as a textual template, if we can use a library that
>> yields an abstract syntax tree for the expression we can easily check these
>> requirements in a robust way - or we could do it in a non-robust way be
>> string-scraping ourselves.
>>
>
> Yes. I think we are talking about the same thing. Users should not have
> full control over the filename since that could lead to conflicts and data
> loss when data is being written in parallel from multiple workers. Users
> can refer to the big data fileset being written using the glob "/**".
> In addition users have control over the filename  and 
> (file extension, for example) which can be useful for some downstream
> use-cases. Rest of the filename will be filled out by the SDK (window, pane
> etc.) to make sure that the files written by different workers do not
> conflict.
>
> Thanks,
> Cham
>
>
>>
>> We actually are very close to this in FileIO. I think the interpretation
>> of "prefix" is that it is the filename "foo" as above, and "suffix" is
>> really something like ".txt" that you stick on the end of everything for
>> whatever reason.
>>
>> Kenn
>>
>> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>>
>>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>>
>>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> I suspect some simple pattern templating would solve most use cases.
>>>>>>> We probably would want to support timestamp formatting (e.g. $ $M 
>>>>>>> $D)
>>>>>>> as well.
>>>>>>>
>>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I would say:
>>>>>>>>>
>>>>>

Re: [Question] Read Parquet Schema from S3 Directory

2023-10-12 Thread Robert Bradshaw via dev
You'll probably need to resolve "s3a:///*.parquet" out into a
concrete non-glob filepattern to inspect it this way. Presumably any
individual shard will do. match and open from
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/FileSystems.html
may be useful.

On Wed, Oct 11, 2023 at 10:29 AM Ramya Prasad via dev 
wrote:

> Hello,
>
> I am a developer trying to use Apache Beam in my Java application, and I'm
> running into an issue with reading multiple Parquet files from a directory
> in S3. I'm able to successfully run this line of code, where tempPath  =
> "s3:///*.parquet":
> PCollection records = pipeline.apply("Read parquet file in
> as Generic Records", ParquetIO.read(schema).from(tempPath));
>
> My problem is reading the schema beforehand. At runtime, I only have the
> name of the S3 bucket, which has all the Parquet files I need underneath
> it. However, I am unable to use that same tempPath above to retrieve my
> schema. Because the path is not pointing to a singular parquet file, the
> ParquetFileReader class from Apache Hadoop throws an error: No such file or
> directory: s3a:///*.parquet.
>
> To read my schema, I'm using this chunk of code:
>
> Configuration configuration = new Configuration();
> configuration.set("fs.s3a.access.key",");
> configuration.set("fs.s3a.secret.key", "");
> configuration.set("fs.s3a.session.token","");
> configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
> configuration.set("fs.s3a.server-side-encryption-algorithm", "");
> configuration.set("fs.s3a.proxy.host", "");
> configuration.set("fs.s3a.proxy.port", "");
> configuration.set("fs.s3a.aws.credentials.provider", 
> "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
>
> String hadoopFilePath = new Path("s3a:///*.parquet");
> ParquetFileReader r = 
> ParquetFileReader.open(HadoopInputFile.fromPath(hadoopFilePath, 
> configuration));
> MessageType messageType = r.getFooter().getFileMetaData().getSchema();
> AvroSchemaConverter converter = new AvroSchemaConverter();
> Schema schema = converter.convert(messageType);
>
> The red line is where the code is failing. Is there maybe a Hadoop
> Configuration I can set to force Hadoop to read recursively?
>
> I realize this is kind of a Beam-adjacent problem, but I've been
> struggling with this for a while, so any help would be appreciated!
>
> Thanks and sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: Proposal for pyproject.toml Support in Apache Beam Python

2023-10-12 Thread Robert Bradshaw via dev
On Thu, Oct 12, 2023 at 2:04 PM Anand Inguva  wrote:

> I am in the process of updating the documentation at
> https://cwiki.apache.org/confluence/display/BEAM/Python+Tips related to
> setup.py/pyproject.toml changes, but yes you can't call setup.py directly
> because it might fail due to the lack of presence of beam python's build
> time dependencies.
>
> With regards to other files(eg:protos), we will follow the similar
> behavior as before(generating proros using `gen_protos.py`).
>

Meaning this will be called automatically when needed (e.g. from pytest)?


> On Thu, Oct 12, 2023 at 4:01 PM Robert Bradshaw 
> wrote:
>
>> Does this change any development practices? E.g. if I clone the repo, I'm
>> assuming I couldn't run "setup.py test" anymore. What about the generated
>> files (like protos, or the yaml definitions copied from other parts of the
>> repo)?
>>
>> On Thu, Oct 12, 2023 at 12:27 PM Anand Inguva via dev <
>> dev@beam.apache.org> wrote:
>>
>>> The PR https://github.com/apache/beam/pull/28385 is merged today. If
>>> there are any observed failures, please comment on the PR and I will follow
>>> up with a forward fix. Thanks.
>>>
>>> On Fri, Sep 1, 2023 at 2:30 PM Anand Inguva 
>>> wrote:
>>>
>>>> Since there is positive feedback from the dev community, I am going
>>>> ahead and implementing this proposal for Python SDK.
>>>>
>>>> @aus...@apache.org   Initially let's move forward
>>>> with the setuptools as backend for building package and as part of the
>>>> future work, we can find a better backend than setuptools.
>>>>
>>>> Thanks for the feedback.
>>>> Anand
>>>>
>>>> On Mon, Aug 28, 2023 at 12:00 PM Austin Bennett 
>>>> wrote:
>>>>
>>>>> I've thought about this a ton, but haven't been in a position to
>>>>> undertake the work.  Thanks for bringing this up, @Anand Inguva
>>>>>  !
>>>>>
>>>>> I'd point us to https://python-poetry.org/  ... [ which is where I'd
>>>>> look take us, but I'm also not able to do all the work, so my
>>>>> suggestion/preference doensn't matter that much ]
>>>>>
>>>>> https://python-poetry.org/docs/pyproject#the-pyprojecttoml-file <-
>>>>> for info on pyproject.toml file.
>>>>>
>>>>> Notice the use of a 'lock' file is very valuable, ex:
>>>>> https://python-poetry.org/docs/basic-usage/#committing-your-poetrylock-file-to-version-control
>>>>>
>>>>> I haven't come across `build`, that might be great too.  I'd highlight
>>>>> that Poetry is pretty common across industry these days, rock-solid,
>>>>> ecosystem of interoperability, users, etc...   If not familiar, PLEASE 
>>>>> have
>>>>> a look at that.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Aug 28, 2023 at 8:04 AM Kerry Donny-Clark via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> +1
>>>>>> Hi Anand,
>>>>>> I appreciate this effort. Managing python dependencies has been a
>>>>>> major pain point for me, and I think this approach would help.
>>>>>> Kerry
>>>>>>
>>>>>> On Mon, Aug 28, 2023 at 10:14 AM Anand Inguva via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hello Beam Dev Team,
>>>>>>>
>>>>>>> I've compiled a design document
>>>>>>> <https://docs.google.com/document/d/17-y48WW25-VGBWZNyTdoN0WUN03k9ZhJjLp9wtyG1Wc/edit#heading=h.wskna8eurvjv>[1]
>>>>>>> proposing the integration of pyproject.toml into Apache Beam's Python 
>>>>>>> build
>>>>>>> process. Your insights and feedback would be invaluable.
>>>>>>>
>>>>>>> What is pyproject.toml?
>>>>>>> pyproject.toml is a configuration file that specifies a project's
>>>>>>> build dependencies and other project-related metadata in a standardized
>>>>>>> format. Before pyproject.toml, Python projects often had multiple
>>>>>>> configuration files (like setup.py, setup.cfg, and requirements.txt).
>>>>>>> pyproject.toml aims to centralize these configurations into one place,
>>>>>>> making project setups more organized and straightforward. One of the
>>>>>>> significant features enabled by pyproject.toml is the ability to perform
>>>>>>> isolated builds. This ensures that build dependencies are separated from
>>>>>>> the project's runtime dependencies, leading to more consistent and
>>>>>>> reproducible builds.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://docs.google.com/document/d/17-y48WW25-VGBWZNyTdoN0WUN03k9ZhJjLp9wtyG1Wc/edit#heading=h.wskna8eurvjv
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anand
>>>>>>>
>>>>>>


Re: Proposal for pyproject.toml Support in Apache Beam Python

2023-10-12 Thread Robert Bradshaw via dev
Does this change any development practices? E.g. if I clone the repo, I'm
assuming I couldn't run "setup.py test" anymore. What about the generated
files (like protos, or the yaml definitions copied from other parts of the
repo)?

On Thu, Oct 12, 2023 at 12:27 PM Anand Inguva via dev 
wrote:

> The PR https://github.com/apache/beam/pull/28385 is merged today. If
> there are any observed failures, please comment on the PR and I will follow
> up with a forward fix. Thanks.
>
> On Fri, Sep 1, 2023 at 2:30 PM Anand Inguva 
> wrote:
>
>> Since there is positive feedback from the dev community, I am going ahead
>> and implementing this proposal for Python SDK.
>>
>> @aus...@apache.org   Initially let's move forward
>> with the setuptools as backend for building package and as part of the
>> future work, we can find a better backend than setuptools.
>>
>> Thanks for the feedback.
>> Anand
>>
>> On Mon, Aug 28, 2023 at 12:00 PM Austin Bennett 
>> wrote:
>>
>>> I've thought about this a ton, but haven't been in a position to
>>> undertake the work.  Thanks for bringing this up, @Anand Inguva
>>>  !
>>>
>>> I'd point us to https://python-poetry.org/  ... [ which is where I'd
>>> look take us, but I'm also not able to do all the work, so my
>>> suggestion/preference doensn't matter that much ]
>>>
>>> https://python-poetry.org/docs/pyproject#the-pyprojecttoml-file <- for
>>> info on pyproject.toml file.
>>>
>>> Notice the use of a 'lock' file is very valuable, ex:
>>> https://python-poetry.org/docs/basic-usage/#committing-your-poetrylock-file-to-version-control
>>>
>>> I haven't come across `build`, that might be great too.  I'd highlight
>>> that Poetry is pretty common across industry these days, rock-solid,
>>> ecosystem of interoperability, users, etc...   If not familiar, PLEASE have
>>> a look at that.
>>>
>>>
>>>
>>>
>>> On Mon, Aug 28, 2023 at 8:04 AM Kerry Donny-Clark via dev <
>>> dev@beam.apache.org> wrote:
>>>
 +1
 Hi Anand,
 I appreciate this effort. Managing python dependencies has been a major
 pain point for me, and I think this approach would help.
 Kerry

 On Mon, Aug 28, 2023 at 10:14 AM Anand Inguva via dev <
 dev@beam.apache.org> wrote:

> Hello Beam Dev Team,
>
> I've compiled a design document
> [1]
> proposing the integration of pyproject.toml into Apache Beam's Python 
> build
> process. Your insights and feedback would be invaluable.
>
> What is pyproject.toml?
> pyproject.toml is a configuration file that specifies a project's
> build dependencies and other project-related metadata in a standardized
> format. Before pyproject.toml, Python projects often had multiple
> configuration files (like setup.py, setup.cfg, and requirements.txt).
> pyproject.toml aims to centralize these configurations into one place,
> making project setups more organized and straightforward. One of the
> significant features enabled by pyproject.toml is the ability to perform
> isolated builds. This ensures that build dependencies are separated from
> the project's runtime dependencies, leading to more consistent and
> reproducible builds.
>
> [1]
> https://docs.google.com/document/d/17-y48WW25-VGBWZNyTdoN0WUN03k9ZhJjLp9wtyG1Wc/edit#heading=h.wskna8eurvjv
>
> Thanks,
> Anand
>



Re: [QUESTION] Why no auto labels?

2023-10-10 Thread Robert Bradshaw via dev
I would definitely support a PR making this an option. Changing the default
would be a rather big change that would require more thought.

On Tue, Oct 10, 2023 at 4:24 PM Joey Tran  wrote:

> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
> Apache Beam at my company and I'm trying to foresee parts of the API they
> might find inconvenient.
>
> If there's a conclusion to make the behavior similar to java, I'm happy to
> put up a PR
>
> On Thu, Oct 5, 2023, 12:49 PM Joey Tran  wrote:
>
>> Is it really toggleable in Java? I imagine that if it's a toggle it'd be
>> a very sticky toggle since it'd be easy for PTransforms to accidentally
>> rely on it.
>>
>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>> wrote:
>>
>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>> with an option now. We should probably add the option to toggle Python too.
>>> (Unclear what the default should be, but this probably ties into
>>> re-thinking how pipeline update should work.)
>>>
>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>> wrote:
>>>
>>>> Makes sense that the requirement is the same, but is the label
>>>> auto-generation behavior the same? I modified the BeamJava
>>>> wordcount example[1] to do the regex filter twice in a row, and unlike the
>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>> exception.
>>>>
>>>> Tangentially, is it expected that the Beam playground examples don't
>>>> have a way to see the outputs of a run example? I have a vague memory that
>>>> there used to be a way to navigate to an output file after it's generated
>>>> but not sure if I just dreamt that up. Playing with the examples, I wasn't
>>>> positive if my runs were actually succeeding or not based on the stdout
>>>> alone.
>>>>
>>>> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
>>>> <https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2>
>>>> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>>>>
>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>> u...@beam.apache.org> wrote:
>>>>
>>>>> BeamJava and BeamPython have the exact same behavior: transform names
>>>>> within must be distinct [1]. This is because we do not necessarily know at
>>>>> pipeline construction time if the pipeline will be streaming or batch, or
>>>>> if it will be updated in the future, so the decision was made to impose
>>>>> this restriction up front. Both will auto-generate a name for you if one 
>>>>> is
>>>>> not given, but will do so deterministically (not depending on some global
>>>>> context) to avoid potential update problems.
>>>>>
>>>>> [1] Note that this applies to the fully qualified transform name, so
>>>>> the naming only has to be distinct within a composite transform (or at the
>>>>> top level--the pipeline itself is isomorphic to a single composite
>>>>> transform).
>>>>>
>>>>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Cross posting this thread to dev@ to see if this is intentional
>>>>>> behavior or if it's something worth changing for the python SDK
>>>>>>
>>>>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user 
>>>>>> wrote:
>>>>>>
>>>>>>> That suggests the default label is created as that, which indeed
>>>>>>> causes the duplication error.
>>>>>>>
>>>>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not sure what that suggests
>>>>>>>>
>>>>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>>>>> `Filter(identity_filter)`.
>>>>>>>>>
>>>>>>>>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran <
>>>>>>>>

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath 
wrote:

>
> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw 
> wrote:
>
>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath 
>> wrote:
>>
>>>
>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>>>
>>>> I suspect some simple pattern templating would solve most use cases. We
>>>> probably would want to support timestamp formatting (e.g. $YYYY $M $D) as
>>>> well.
>>>>
>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> I would say:
>>>>>>
>>>>>> sink:
>>>>>>   type: WriteToParquet
>>>>>>   config:
>>>>>> path: /beam/filesytem/dest
>>>>>> prefix: 
>>>>>> suffix: 
>>>>>>
>>>>>> Underlying SDK will add the middle part of the file names to make
>>>>>> sure that files generated by various bundles/windows/shards do not 
>>>>>> conflict.
>>>>>>
>>>>>
>>>>> What's the relationship between path and prefix? Is path the
>>>>> directory part of the full path, or does prefix precede it?
>>>>>
>>>>
>>> prefix would be the first part of the file name so each shard will be
>>> named.
>>> /--
>>>
>>> This is similar to what we do in existing SDKS. For example, Java FileIO,
>>>
>>>
>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>>>
>>
>> Yeah, although there's no distinction between path and prefix.
>>
>
> Ah, for FIleIO, path comes from the "to" call.
>
>
> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125
>
>

Ah. I guess there's some inconsistency here, e.g. text files are written to
a filenamePrefix that subsumes both:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String-


>
>>
>>>>>
>>>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>>>> customizing the file pattern sounds like a more advanced use case that 
>>>>>> can
>>>>>> be left for "real" SDKs.
>>>>>>
>>>>>
>>>>> Yea, we don't have to do everything.
>>>>>
>>>>>
>>>>>> For dynamic destinations, I think just making the "path" component
>>>>>> support  a lambda that is parameterized by the input should be adequate
>>>>>> since this allows customers to direct files written to different
>>>>>> destination directories.
>>>>>>
>>>>>> sink:
>>>>>>   type: WriteToParquet
>>>>>>   config:
>>>>>> path: 
>>>>>> prefix: 
>>>>>> suffix: 
>>>>>>
>>>>>> I'm not sure what would be the best way to specify a lambda here
>>>>>> though. Maybe a regex or the name of a Python callable ?
>>>>>>
>>>>>
>>>>> I'd rather not require Python for a pure Java pipeline, but some kind
>>>>> of a pattern template may be sufficient here.
>>>>>
>>>>>
>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>>>> having
>>>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>>>> create a KV where the key contained the dynamic information). The 
>>>>>>>> problem
>>

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 4:03 PM Robert Bradshaw  wrote:

> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>
>> I suspect some simple pattern templating would solve most use cases.
>>
>
> That's what I'm leaning towards as well.
>
>
>> We probably would want to support timestamp formatting (e.g. $ $M $D)
>> as well.
>>
>
> Although we have several timestamps to consider: the element timestamp,
> its window start and/or end, walltime.
>

And users may want some of these to be (part of) the directory structure as
well, not just in a "sharding" suffix.


> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>> wrote:
>>
>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> I would say:
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: /beam/filesytem/dest
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> Underlying SDK will add the middle part of the file names to make sure
>>>> that files generated by various bundles/windows/shards do not conflict.
>>>>
>>>
>>> What's the relationship between path and prefix? Is path the
>>> directory part of the full path, or does prefix precede it?
>>>
>>>
>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>> customizing the file pattern sounds like a more advanced use case that can
>>>> be left for "real" SDKs.
>>>>
>>>
>>> Yea, we don't have to do everything.
>>>
>>>
>>>> For dynamic destinations, I think just making the "path" component
>>>> support  a lambda that is parameterized by the input should be adequate
>>>> since this allows customers to direct files written to different
>>>> destination directories.
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: 
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> I'm not sure what would be the best way to specify a lambda here
>>>> though. Maybe a regex or the name of a Python callable ?
>>>>
>>>
>>> I'd rather not require Python for a pure Java pipeline, but some kind of
>>> a pattern template may be sufficient here.
>>>
>>>
>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>>
>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>> having
>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>> create a KV where the key contained the dynamic information). The problem
>>>>>> was that often the size of the generated filepath was often much larger
>>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>> integer userid, and the filepath prefix would then be
>>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>>> the
>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>
>>>>>
>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>> bringing this up.
>>>>>
>>>>>
>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>>> from the record!), but we should keep in mind that this might mean that
>>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>>
>>>>>
>>>>> Yep, it's not as straightforward to do in a declarative way. I would
>

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:

> I suspect some simple pattern templating would solve most use cases.
>

That's what I'm leaning towards as well.


> We probably would want to support timestamp formatting (e.g. $ $M $D)
> as well.
>

Although we have several timestamps to consider: the element timestamp, its
window start and/or end, walltime.


> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
> wrote:
>
>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
>> wrote:
>>
>>> I would say:
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: /beam/filesytem/dest
>>> prefix: 
>>> suffix: 
>>>
>>> Underlying SDK will add the middle part of the file names to make sure
>>> that files generated by various bundles/windows/shards do not conflict.
>>>
>>
>> What's the relationship between path and prefix? Is path the
>> directory part of the full path, or does prefix precede it?
>>
>>
>>> This will satisfy the vast majority of use-cases I believe. Fully
>>> customizing the file pattern sounds like a more advanced use case that can
>>> be left for "real" SDKs.
>>>
>>
>> Yea, we don't have to do everything.
>>
>>
>>> For dynamic destinations, I think just making the "path" component
>>> support  a lambda that is parameterized by the input should be adequate
>>> since this allows customers to direct files written to different
>>> destination directories.
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>>     path: 
>>> prefix: 
>>> suffix: 
>>>
>>> I'm not sure what would be the best way to specify a lambda here though.
>>> Maybe a regex or the name of a Python callable ?
>>>
>>
>> I'd rather not require Python for a pure Java pipeline, but some kind of
>> a pattern template may be sufficient here.
>>
>>
>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>
>>>>> Just FYI - the reason why names (including prefixes) in
>>>>> DynamicDestinations were parameterized via a lambda instead of just having
>>>>> the user add it via MapElements is performance. We discussed something
>>>>> along the lines of what you are suggesting (essentially having the user
>>>>> create a KV where the key contained the dynamic information). The problem
>>>>> was that often the size of the generated filepath was often much larger
>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>> integer userid, and the filepath prefix would then be
>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>> the
>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>
>>>>
>>>> That is a consideration I hadn't thought much of, thanks for
>>>> bringing this up.
>>>>
>>>>
>>>>> Now there may not be any good way to keep this benefit in a
>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>> from the record!), but we should keep in mind that this might mean that
>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>
>>>>
>>>> Yep, it's not as straightforward to do in a declarative way. I would
>>>> like to avoid mixing UDFs (with their associated languages and execution
>>>> environments) if possible. Though I'd like the performance of a
>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>> straight-line Java (and possibly better, if we can leverage the structure
>>>> of schemas everywhere) this is not an absolute requirement for all
>>>> features.
>>>>
>>>> I wonder if separating out a constant prefix vs. the dynamic stuff
>>>> could be sufficient to mit

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath 
wrote:

>
> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>
>> I suspect some simple pattern templating would solve most use cases. We
>> probably would want to support timestamp formatting (e.g. $ $M $D) as
>> well.
>>
>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>> wrote:
>>
>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> I would say:
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: /beam/filesytem/dest
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> Underlying SDK will add the middle part of the file names to make sure
>>>> that files generated by various bundles/windows/shards do not conflict.
>>>>
>>>
>>> What's the relationship between path and prefix? Is path the
>>> directory part of the full path, or does prefix precede it?
>>>
>>
> prefix would be the first part of the file name so each shard will be
> named.
> /--
>
> This is similar to what we do in existing SDKS. For example, Java FileIO,
>
>
> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>

Yeah, although there's no distinction between path and prefix.


>>>
>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>> customizing the file pattern sounds like a more advanced use case that can
>>>> be left for "real" SDKs.
>>>>
>>>
>>> Yea, we don't have to do everything.
>>>
>>>
>>>> For dynamic destinations, I think just making the "path" component
>>>> support  a lambda that is parameterized by the input should be adequate
>>>> since this allows customers to direct files written to different
>>>> destination directories.
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: 
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> I'm not sure what would be the best way to specify a lambda here
>>>> though. Maybe a regex or the name of a Python callable ?
>>>>
>>>
>>> I'd rather not require Python for a pure Java pipeline, but some kind of
>>> a pattern template may be sufficient here.
>>>
>>>
>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>>
>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>> having
>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>> create a KV where the key contained the dynamic information). The problem
>>>>>> was that often the size of the generated filepath was often much larger
>>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>> integer userid, and the filepath prefix would then be
>>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>>> the
>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>
>>>>>
>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>> bringing this up.
>>>>>
>>>>>
>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>>> from the record!), but we should keep in mind that this might mean that
>>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>>
>>>>>
>>>>

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
wrote:

> I would say:
>
> sink:
>   type: WriteToParquet
>   config:
> path: /beam/filesytem/dest
> prefix: 
> suffix: 
>
> Underlying SDK will add the middle part of the file names to make sure
> that files generated by various bundles/windows/shards do not conflict.
>

What's the relationship between path and prefix? Is path the directory part
of the full path, or does prefix precede it?


> This will satisfy the vast majority of use-cases I believe. Fully
> customizing the file pattern sounds like a more advanced use case that can
> be left for "real" SDKs.
>

Yea, we don't have to do everything.


> For dynamic destinations, I think just making the "path" component
> support  a lambda that is parameterized by the input should be adequate
> since this allows customers to direct files written to different
> destination directories.
>
> sink:
>   type: WriteToParquet
>   config:
> path: 
> prefix: 
> suffix: 
>
> I'm not sure what would be the best way to specify a lambda here though.
> Maybe a regex or the name of a Python callable ?
>

I'd rather not require Python for a pure Java pipeline, but some kind of a
pattern template may be sufficient here.


> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>
>>> Just FYI - the reason why names (including prefixes) in
>>> DynamicDestinations were parameterized via a lambda instead of just having
>>> the user add it via MapElements is performance. We discussed something
>>> along the lines of what you are suggesting (essentially having the user
>>> create a KV where the key contained the dynamic information). The problem
>>> was that often the size of the generated filepath was often much larger
>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>> desire to avoid record blowup. e.g. the record might contain a single
>>> integer userid, and the filepath prefix would then be
>>> /long/path/to/output/users/. This was especially bad in cases where the
>>> data had to be shuffled, and the existing dynamic destinations method
>>> allowed extracting the filepath only _after_  the shuffle.
>>>
>>
>> That is a consideration I hadn't thought much of, thanks for
>> bringing this up.
>>
>>
>>> Now there may not be any good way to keep this benefit in a
>>> declarative approach such as YAML (or at least a good easy way - we could
>>> always allow the user to pass in a SQL expression to extract the filename
>>> from the record!), but we should keep in mind that this might mean that
>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>
>>
>> Yep, it's not as straightforward to do in a declarative way. I would like
>> to avoid mixing UDFs (with their associated languages and execution
>> environments) if possible. Though I'd like the performance of a
>> "straightforward" YAML pipeline to be that which one can get writing
>> straight-line Java (and possibly better, if we can leverage the structure
>> of schemas everywhere) this is not an absolute requirement for all
>> features.
>>
>> I wonder if separating out a constant prefix vs. the dynamic stuff could
>> be sufficient to mitigate the blow-up of pre-computing this in most cases
>> (especially in the context of a larger pipeline). Alternatively, rather
>> than just a sharding pattern, one could have a full filepattern that
>> includes format parameters for dynamically computed bits as well as the
>> shard number, windowing info, etc. (There are pros and cons to this.)
>>
>>
>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Currently the various file writing configurations take a single
>>>> parameter, path, which indicates where the (sharded) output should be
>>>> placed. In other words, one can write something like
>>>>
>>>>   pipeline:
>>>> ...
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: /beam/filesytem/dest
>>>>
>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>
>>>> Of course, in practice file writing is often much more complicated than
>>>>

  1   2   3   4   5   6   7   8   9   10   >