Re: release process: typescript SDK?

2024-04-16 Thread Robert Bradshaw via dev
Correct, I've just been pushing these manually, and lately there haven't
been many changes to push. I'm all for getting these set up as part of the
standard release process.

On Tue, Apr 16, 2024 at 1:22 PM Danny McCormick 
wrote:

> I've never published npm artifacts before, but I imagine the hardest part
> is getting the credentials set up, then it is probably very easy to set up
> a GitHub Actions workflow to publish
> <https://docs.github.com/en/actions/publishing-packages/publishing-nodejs-packages#publishing-packages-to-the-npm-registry>.
> Who has done these releases in the past/has credentials for the npm
> package? Maybe @Robert Bradshaw ? We will need a
> token set up as a secret to automate this.
>
> I'll also note that we don't do any typescript validation today, and it
> would be nice to publish RCs as part of this
>
> On Tue, Apr 16, 2024 at 4:11 PM Austin Bennett  wrote:
>
>> Hi Beam Devs,
>>
>> Calling out it looks like our release process for apache-beam for
>> typescript/npm is broken, seemingly the last published release was 2.49.0
>> about 9 months ago.  The other languages look like they are publishing to
>> expected locations.
>>
>> https://www.npmjs.com/package/apache-beam
>>
>> I noticed this since I was digging into security concerns raised by
>> GitHub's dependabot across our repos [ ex:
>> https://github.com/apache/beam-starter-typescript/security/dependabot ], and
>> towards getting our repos tidied.
>>
>> This leads me to believe we may want two distinct things:
>> * update our release docs/process/scripts to ensure that we
>> generate/publish all artifacts to relevant repositories.
>> * Arrive at a process to more straightforwardly attend to security
>> updates [ maybe we want these sent to dev list, or another distribution? ]
>>
>> From a very quick search, it did not look like we have scripts to push to
>> npm.  That should be verified more thoroughly -- i haven't done a release
>> before, so relevant scripts could be hiding elsewhere.
>>
>> Cheers,
>> Austin
>>
>>
>> NOTE:  everything with our main Beam repo specifically looks OK.  Some
>> things discovered were on the other/supplementary repos, though I believe
>> those are still worthwhile to attend to and support.
>>
>


Re: [VOTE] Patch Release 2.55.1, release candidate #2

2024-04-03 Thread Robert Bradshaw via dev
+1 (binding) The artifacts all look good to me.

On Wed, Apr 3, 2024 at 1:35 PM XQ Hu via dev  wrote:

> +1 (non-binding). Tested this using a simple Dataflow ML pipeline:
> https://github.com/google/dataflow-ml-starter/actions/runs/8541848483.
>
> On Wed, Apr 3, 2024 at 2:35 PM Jeff Kinard  wrote:
>
>> +1. Validated running from local gradle JAR and staged maven JAR for
>> expansion-service.
>>
>> On Wed, Apr 3, 2024 at 11:08 AM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi everyone,
>>>
>>> I put together a patch release per the conversation in
>>> https://lists.apache.org/thread/kvq1wsj505pvopkq186dnvc0l6ryyfh0.
>>>
>>> Please review and vote on the release candidate #2 (I messed up rc1) for
>>> the version 2.55.1, as follows:
>>> [ ] +1, Approve the release
>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>
>>>
>>> Reviewers are encouraged to test their own use cases with the release
>>> candidate, and vote +1 if no issues are found. Only PMC member votes will
>>> count towards the final vote, but votes from all community members is
>>> encouraged and helpful for finding regressions; you can either test your
>>> own use cases [9] or use cases from the validation sheet [7].
>>>
>>> The complete staging area is available for your review, which includes:
>>> * the official Apache source release to be deployed to dist.apache.org
>>> [1], which is signed with the key with fingerprint D20316F712213422 [2],
>>> * all artifacts to be deployed to the Maven Central Repository [3],
>>> * source code tag "v2.55.1-RC2" [4],
>>> * Python artifacts are deployed along with the source release to the
>>> dist.apache.org [1] and PyPI[5].
>>> * Go artifacts and documentation are available at pkg.go.dev [6]
>>> * Validation sheet with a tab for 2.55.1 release to help with validation
>>> [7].
>>> * Docker images published to Docker Hub [8].
>>>
>>> This release does not include any website changes since it is addressing
>>> a single bug fix as discussed in
>>> https://lists.apache.org/thread/kvq1wsj505pvopkq186dnvc0l6ryyfh0.
>>>
>>> The vote will be open for at least 72 hours. It is adopted by majority
>>> approval, with at least 3 PMC affirmative votes.
>>>
>>> For guidelines on how to try the release in your projects, check out our
>>> RC testing guide [9].
>>>
>>> Thanks,
>>> Danny
>>>
>>> [1] https://dist.apache.org/repos/dist/dev/beam/2.55.1/
>>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>> [3]
>>> https://repository.apache.org/content/repositories/orgapachebeam-1375/
>>> [4] https://github.com/apache/beam/tree/v2.55.1-RC2
>>> [5] https://pypi.org/project/apache-beam/2.55.1rc2/
>>> [6]
>>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.55.1-RC2/go/pkg/beam
>>> [7]
>>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=686075626
>>> [8] https://hub.docker.com/search?q=apache%2Fbeam=image
>>> [9]
>>> https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md
>>>
>>


Re: Supporting Dynamic Destinations in a portable context

2024-04-03 Thread Robert Bradshaw via dev
On Wed, Apr 3, 2024 at 4:15 AM Kenneth Knowles  wrote:

> Let me summarize the most recent proposal on-list to frame my question
> about this last suggestion. It looks like this:
>
> 1. user has an element, call it `data`
> 2. user maps `data` to an arbitrary metadata row, call it `dest`
> 3. we can do things like shuffle on `dest` because it isn't too big
> 4. we map `dest` to a concrete destination (aka URL) to write to by a
> string format that uses fields of `dest`
>
> I believe steps 1-3 are identical is expressivity to non-portable
> DynamicDestinations. So Reuven the question is for step 4: what are the
> mappings from `dest` to URL that cannot be expressed by string formatting
> but need SQL or Lua, etc? That would be a useful guide to consideration of
> those possibilities.
>

I think any non-trivial mapping can be done in step 2. It may be possible
to come up with a case where something other than string substitution is
needed to be done to make dest small enough to shuffle, but I think that'd
be a really rare corner case, and then it's just an optimization rather
than feature completeness question.


> FWIW I think even if we add a mini-language that string formatting has
> better ease of use (can easily be displayed in UI, etc) so it would be the
> first choice, and more advanced stuff is a fallback for rare cases. So they
> are both valuable and I'd be happy to implement the easier-to-use path
> right away while we discuss.
>

+1. Note that this even lets us share the config "path/table/..." field
that is a static string for non-dynamic destinations.

In light of the above, let's avoid a complex mini-language. I'd start with
nothing but plugging things in w/o any formatting options.


> On Tue, Apr 2, 2024 at 2:59 PM Reuven Lax via dev 
> wrote:
>
>> I do suspect that over time we'll find more and more cases we can't
>> express, and will be asked to extend this little templating in more
>> directions. To head that off - could we easily just reuse an existing
>> language (SQL, LUA, something of the form?) instead of creating something
>> new?
>>
>> On Tue, Apr 2, 2024 at 8:55 AM Kenneth Knowles  wrote:
>>
>>> I really like this proposal. I think it has narrowed down and solved the
>>> essential problem of not shuffling excess redundant data, and also provides
>>> the vast majority of the functionality that a lambda would, with
>>> significantly better debugability and usability too, since the dynamic
>>> destination pattern string can be in display data, etc.
>>>
>>> Kenn
>>>
>>> On Wed, Mar 27, 2024 at 1:58 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> On Wed, Mar 27, 2024 at 10:20 AM Reuven Lax  wrote:
>>>>
>>>>> Can the prefix still be generated programmatically at graph creation
>>>>> time?
>>>>>
>>>>
>>>> Yes. It's just a property of the transform passed by the user at
>>>> configuration time.
>>>>
>>>>
>>>>> On Wed, Mar 27, 2024 at 9:40 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> This does seem like the best compromise, though I think there will
>>>>>>> still end up being performance issues. A common pattern I've seen is 
>>>>>>> that
>>>>>>> there is a long common prefix to the dynamic destination followed the
>>>>>>> dynamic component. e.g. the destination might be
>>>>>>> long/common/path/to/destination/files/. In this case, the
>>>>>>> prefix is often much larger than messages themselves and is what gets
>>>>>>> effectively encoded in the lambda.
>>>>>>>
>>>>>>
>>>>>> The idea here is that the destination would be given as a format
>>>>>> string, say, "long/common/path/to/destination/files/{dest_info.user}".
>>>>>> Another way to put this is that we support (only) "lambdas" that are
>>>>>> represented as string substitutions. (The fact that dest_info does not 
>>>>>> have
>>>>>> to be part of the record, and can be the output of an arbitrary map if 
>>>>>> need
>>>>>> be, makes this restriction not so bad.)
>>>>>>
>>>>>> As well as solving the performance issues, I think this is actually a
>>>>>> pretty convenient and natur

Re: Design proposal for Beam YAML templates

2024-04-02 Thread Robert Bradshaw via dev
It looks like we're converging on an optional jinja preprocessing phase to
handle this. I'm in favor of this solution.

On Wed, Mar 20, 2024 at 9:23 AM Robert Bradshaw  wrote:

> Thanks. I think this will be a very powerful feature. Left some comments
> on the doc.
>
> On Tue, Mar 19, 2024 at 11:53 AM Jeff Kinard  wrote:
>
>> Hi all,
>>
>> I have another quick design doc discussing the syntax for Beam YAML
>> templates. This feature would allow a user to create a template pipelines
>> similar to what is done in
>> https://github.com/GoogleCloudPlatform/DataflowTemplates
>> that are configurable and therefore more reusable than static YAML
>> pipelines.
>>
>> Design doc:
>> https://docs.google.com/document/d/1AUt4NEoQCBrOEVhadVozyTU2-CqC-pw7kuK6rzIA6Jc/edit?resourcekey=0-d-aVmsKZ89SzSiVwt8n5pw
>>
>> Please take a look and feel free to leave any comments.
>>
>> Thanks,
>> - Jeff
>>
>


Re: Patch release proposal

2024-03-27 Thread Robert Bradshaw via dev
Given the severity of the breakage, and the simplicity of the workaround,
I'm in favor of a patch release. I think we could do Python-only, which
would make the process even more lightweight.

On Wed, Mar 27, 2024 at 3:48 PM Jeff Kinard  wrote:

> Hi all,
>
> Beam 2.55 was released with a bug that causes WriteToJson on Beam YAML to
> fail when using the Java variant. This also affects any user attempting to
> use the Xlang JsonWriteTransformProvider -
> https://github.com/apache/beam/blob/master/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
>
> This is due to a change to
> https://github.com/apache/beam/blob/master/sdks/java/io/json/build.gradle
> that removed
> a dependency on everit which also removed it from being packaged into the
> expansion service JAR:
> beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar
>
> There is a temporary fix to disable the provider in Beam YAML:
> https://github.com/apache/beam/pull/30777
>
> I think with the total loss of function, and a trivial fix, it is worth
> creating a patch release of Beam 2.55 to include this fix.
>
> - Jeff
>
>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Robert Bradshaw via dev
On Wed, Mar 27, 2024 at 10:20 AM Reuven Lax  wrote:

> Can the prefix still be generated programmatically at graph creation time?
>

Yes. It's just a property of the transform passed by the user at
configuration time.


> On Wed, Mar 27, 2024 at 9:40 AM Robert Bradshaw 
> wrote:
>
>> On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:
>>
>>> This does seem like the best compromise, though I think there will still
>>> end up being performance issues. A common pattern I've seen is that there
>>> is a long common prefix to the dynamic destination followed the dynamic
>>> component. e.g. the destination might be
>>> long/common/path/to/destination/files/. In this case, the
>>> prefix is often much larger than messages themselves and is what gets
>>> effectively encoded in the lambda.
>>>
>>
>> The idea here is that the destination would be given as a format string,
>> say, "long/common/path/to/destination/files/{dest_info.user}". Another way
>> to put this is that we support (only) "lambdas" that are represented as
>> string substitutions. (The fact that dest_info does not have to be part of
>> the record, and can be the output of an arbitrary map if need be, makes
>> this restriction not so bad.)
>>
>> As well as solving the performance issues, I think this is actually a
>> pretty convenient and natural way for the user to name their destination
>> (for the common usecase, even easier than providing a lambda), and has the
>> benefit of being much more transparent than an arbitrary callable as well
>> for introspection (for both machine and human that may look at the
>> resulting pipeline).
>>
>>
>>> I'm not entirely sure how to address this in a portable context. We
>>> might simply have to accept the extra overhead when going cross language.
>>>
>>> Reuven
>>>
>>> On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Thanks for putting this together, it will be a really useful feature to
>>>> have.
>>>>
>>>> I am in favor of the string-pattern approaches. I think we need to
>>>> support both the {record=..., dest_info=...} and the elide-fields
>>>> approaches, as the former is nicer when one has a fixed representation for
>>>> the output record (e.g. a proto or avro schema) and the flattened form for
>>>> ease of use in more free-form contexts (e.g. when producing records from
>>>> YAML and SQL).
>>>>
>>>> Also left some comments on the doc.
>>>>
>>>>
>>>> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> There have been some conversations lately about how best to enable
>>>>> dynamic destinations in a portable context. Usually, this comes up for
>>>>> cross-language transforms and more recently for Beam YAML.
>>>>>
>>>>> I've started a short doc outlining some routes we could take. The
>>>>> purpose is to establish a good standard for supporting dynamic 
>>>>> destinations
>>>>> with portability, one that can be applied to most use cases and IOs. 
>>>>> Please
>>>>> take a look and add any thoughts!
>>>>>
>>>>> https://s.apache.org/portable-dynamic-destinations
>>>>>
>>>>> Best,
>>>>> Ahmed
>>>>>
>>>>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Robert Bradshaw via dev
On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:

> This does seem like the best compromise, though I think there will still
> end up being performance issues. A common pattern I've seen is that there
> is a long common prefix to the dynamic destination followed the dynamic
> component. e.g. the destination might be
> long/common/path/to/destination/files/. In this case, the
> prefix is often much larger than messages themselves and is what gets
> effectively encoded in the lambda.
>

The idea here is that the destination would be given as a format string,
say, "long/common/path/to/destination/files/{dest_info.user}". Another way
to put this is that we support (only) "lambdas" that are represented as
string substitutions. (The fact that dest_info does not have to be part of
the record, and can be the output of an arbitrary map if need be, makes
this restriction not so bad.)

As well as solving the performance issues, I think this is actually a
pretty convenient and natural way for the user to name their destination
(for the common usecase, even easier than providing a lambda), and has the
benefit of being much more transparent than an arbitrary callable as well
for introspection (for both machine and human that may look at the
resulting pipeline).


> I'm not entirely sure how to address this in a portable context. We might
> simply have to accept the extra overhead when going cross language.
>
> Reuven
>
> On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Thanks for putting this together, it will be a really useful feature to
>> have.
>>
>> I am in favor of the string-pattern approaches. I think we need to
>> support both the {record=..., dest_info=...} and the elide-fields
>> approaches, as the former is nicer when one has a fixed representation for
>> the output record (e.g. a proto or avro schema) and the flattened form for
>> ease of use in more free-form contexts (e.g. when producing records from
>> YAML and SQL).
>>
>> Also left some comments on the doc.
>>
>>
>> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hey all,
>>>
>>> There have been some conversations lately about how best to enable
>>> dynamic destinations in a portable context. Usually, this comes up for
>>> cross-language transforms and more recently for Beam YAML.
>>>
>>> I've started a short doc outlining some routes we could take. The
>>> purpose is to establish a good standard for supporting dynamic destinations
>>> with portability, one that can be applied to most use cases and IOs. Please
>>> take a look and add any thoughts!
>>>
>>> https://s.apache.org/portable-dynamic-destinations
>>>
>>> Best,
>>> Ahmed
>>>
>>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Robert Bradshaw via dev
Thanks for putting this together, it will be a really useful feature to
have.

I am in favor of the string-pattern approaches. I think we need to support
both the {record=..., dest_info=...} and the elide-fields approaches, as
the former is nicer when one has a fixed representation for the
output record (e.g. a proto or avro schema) and the flattened form for ease
of use in more free-form contexts (e.g. when producing records from YAML
and SQL).

Also left some comments on the doc.


On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev 
wrote:

> Hey all,
>
> There have been some conversations lately about how best to enable dynamic
> destinations in a portable context. Usually, this comes up for
> cross-language transforms and more recently for Beam YAML.
>
> I've started a short doc outlining some routes we could take. The purpose
> is to establish a good standard for supporting dynamic destinations with
> portability, one that can be applied to most use cases and IOs. Please take
> a look and add any thoughts!
>
> https://s.apache.org/portable-dynamic-destinations
>
> Best,
> Ahmed
>


Re: Python API: FlatMap default -> lambda x:x?

2024-03-21 Thread Robert Bradshaw via dev
I would be more comfortable with a default for FlatMap than overloading
Flatten in this way. Distinguishing between

(pcoll,) | beam.Flatten()

and

(pcoll) | beam.Flatten()

seems a bit error prone.


On Thu, Mar 21, 2024 at 2:23 PM Joey Tran  wrote:

> Ah, I misunderstood your original suggestion then. That makes sense then.
> I have already seen someone get a little confused about the names and
> surprised that Flatten doesn't do what FlatMap does.
>
> On Thu, Mar 21, 2024 at 5:20 PM Valentyn Tymofieiev 
> wrote:
>
>> Beam throws an error at submission time in Python if you pass a single
>> PCollection  to Flatten. The scenario you describe concerns a one-element
>> list.
>>
>> On Thu, Mar 21, 2024, 13:43 Joey Tran  wrote:
>>
>>> I think it'd be quite surprising if beam.Flatten would become equivalent
>>> to FlatMap if passed only a single pcollection. One use case that would be
>>> broken from that is cases where someone might be flattening a variable
>>> number of pcollections, including possibly only one pcollection. In that
>>> case, that single pcollection suddenly get FlatMapped.
>>>
>>>
>>>
>>> On Thu, Mar 21, 2024 at 4:36 PM Valentyn Tymofieiev via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> One possible alternative is to define beam.Flatten for a single
>>>> collection to be functionally equivalent to beam.FlatMap(lambda x: x), but
>>>> that would be a larger change and such behavior might need to be
>>>> consistent across SDKs and documented. Adding a default value is a simpler
>>>> change.
>>>>
>>>> I can also confirm that the usage
>>>>
>>>> |  'Flatten' >> beam.FlatMap(lambda x: x)
>>>>
>>>> is fairly common by inspecting uses of Beam internally.
>>>> On Thu, Mar 21, 2024 at 1:30 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> IIRC, Java has Flatten.iterables() and Flatten.collections(), the
>>>>> first of which does what you want.
>>>>>
>>>>> Giving FlatMap a default arg of lambda x: x is an interesting idea.
>>>>> The only downside I see is a less clear error if one forgets to provide
>>>>> this (now mandatory) parameter, but maybe that's low enough to be worth 
>>>>> the
>>>>> convenience?
>>>>>
>>>>> On Thu, Mar 21, 2024 at 12:02 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> That's not really the same thing, is it? `beam.Flatten` combines two
>>>>>> or more pcollections into a single pcollection while beam.FlatMap unpacks
>>>>>> iterables of elements (i.e. PCollection> -> PCollection)
>>>>>>
>>>>>> On Thu, Mar 21, 2024 at 2:57 PM Valentyn Tymofieiev via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hi, you can use beam.Flatten() instead.
>>>>>>>
>>>>>>> On Thu, Mar 21, 2024 at 10:55 AM Joey Tran <
>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>
>>>>>>>> Hey all,
>>>>>>>>
>>>>>>>> Using an identity function for FlatMap comes up more often than
>>>>>>>> using FlatMap without an identity function. Would it make sense to use 
>>>>>>>> the
>>>>>>>> identity function as a default?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>


Re: Python API: FlatMap default -> lambda x:x?

2024-03-21 Thread Robert Bradshaw via dev
IIRC, Java has Flatten.iterables() and Flatten.collections(), the first of
which does what you want.

Giving FlatMap a default arg of lambda x: x is an interesting idea. The
only downside I see is a less clear error if one forgets to provide this
(now mandatory) parameter, but maybe that's low enough to be worth the
convenience?

On Thu, Mar 21, 2024 at 12:02 PM Joey Tran 
wrote:

> That's not really the same thing, is it? `beam.Flatten` combines two or
> more pcollections into a single pcollection while beam.FlatMap unpacks
> iterables of elements (i.e. PCollection> -> PCollection)
>
> On Thu, Mar 21, 2024 at 2:57 PM Valentyn Tymofieiev via dev <
> dev@beam.apache.org> wrote:
>
>> Hi, you can use beam.Flatten() instead.
>>
>> On Thu, Mar 21, 2024 at 10:55 AM Joey Tran 
>> wrote:
>>
>>> Hey all,
>>>
>>> Using an identity function for FlatMap comes up more often than using
>>> FlatMap without an identity function. Would it make sense to use the
>>> identity function as a default?
>>>
>>>
>>>
>>>


Re: Design proposal for Beam YAML templates

2024-03-20 Thread Robert Bradshaw via dev
Thanks. I think this will be a very powerful feature. Left some comments on
the doc.

On Tue, Mar 19, 2024 at 11:53 AM Jeff Kinard  wrote:

> Hi all,
>
> I have another quick design doc discussing the syntax for Beam YAML
> templates. This feature would allow a user to create a template pipelines
> similar to what is done in
> https://github.com/GoogleCloudPlatform/DataflowTemplates
> that are configurable and therefore more reusable than static YAML
> pipelines.
>
> Design doc:
> https://docs.google.com/document/d/1AUt4NEoQCBrOEVhadVozyTU2-CqC-pw7kuK6rzIA6Jc/edit?resourcekey=0-d-aVmsKZ89SzSiVwt8n5pw
>
> Please take a look and feel free to leave any comments.
>
> Thanks,
> - Jeff
>


Re: [GSoC] Build out Beam Yaml features

2024-03-07 Thread Robert Bradshaw via dev
This looks like an interesting proposal. Thanks! I left some comments
on the doc.

On Tue, Mar 5, 2024 at 8:39 AM Reeba Qureshi  wrote:
>
> Hi all!
>
> I am Reeba Qureshi, interested in the "Build out Beam Yaml features"  (link) 
> for GSoC 2024. I worked with Apache Beam during GSoC 2023 and built Beam ML 
> use cases (report). It was a great experience and I'm looking forward to 
> contributing more to the Beam project!
>
> Here is the initial draft of my proposal (link). I would appreciate any 
> feedback or suggestions on it!
>
> Thanks,
> Reeba Qureshi


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:
>
> On 2/27/24 19:22, Robert Bradshaw via dev wrote:
> > On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
> >> Pulling out focus points:
> >>
> >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>  wrote:
> >>> I can't act on something yet [...] but I expect to be able to [...] at 
> >>> some time in the processing-time future.
> >> I like this as a clear and internally-consistent feature description. It 
> >> describes ProcessContinuation and those timers which serve the same 
> >> purpose as ProcessContinuation.
> >>
> >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>  wrote:
> >>> I can't think of a batch or streaming scenario where it would be correct 
> >>> to not wait at least that long
> >> The main reason we created timers: to take action in the absence of data. 
> >> The archetypal use case for processing time timers was/is "flush data from 
> >> state if it has been sitting there too long". For this use case, the right 
> >> behavior for batch is to skip the timer. It is actually basically 
> >> incorrect to wait.
> > Good point calling out the distinction between "I need to wait in case
> > there's more data." and "I need to wait for something external." We
> > can't currently distinguish between the two, but a batch runner can
> > say something definitive about the first. Feels like we need a new
> > primitive (or at least new signaling information on our existing
> > primitive).
> Runners signal end of data to a DoFn via (input) watermark. Is there a
> need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
gt;
>>
>> Kenn
>>
>> On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:
>>>
>>> I think that before we introduce a possibly somewhat duplicate new feature 
>>> we should be certain that it is really semantically different. I'll 
>>> rephrase the two cases:
>>>
>>>  a) need to wait and block data (delay) - the use case is the motivating 
>>> example of Throttle transform
>>>
>>>  b) act without data, not block
>>>
>>> Provided we align processing time with local machine clock (or better, 
>>> because of testing, make current processing time available via context to 
>>> @ProcessElement) it seems to possble to unify both cases under slightly 
>>> updated semantics of processing time timer in batch:
>>>
>>>  a) processing time timers fire with best-effort, i.e. trying to minimize 
>>> delay between firing timestamp and timer's timestamp
>>>  b) timer is valid only in the context of current key-window, once 
>>> watermark passes window GC time for the particular window that created the 
>>> timer, it is ignored
>>>  c) if timer has output timestamp, this timestamp holds watermark (but this 
>>> is currently probably noop, because runners currently do no propagate 
>>> (per-key) watermark in batch, I assume)
>>>
>>> In case b) there might be needed to distinguish cases when timer has output 
>>> timestamp, if so, it probably should be taken into account.
>>>
>>> Now, such semantics should be quite aligned with what we do in streaming 
>>> case and what users generally expect. The blocking part can be implemented 
>>> in @ProcessElement using buffer & timer, once there is need to wait, it can 
>>> be implemented in user code using plain sleep(). That is due to the 
>>> alignment between local time and definition of processing time. If we had 
>>> some reason to be able to run faster-than-wall-clock (as I'm still not in 
>>> favor of that), we could do that using ProcessContext.sleep(). Delaying 
>>> processing in the @ProcessElement should result in backpressuring and 
>>> backpropagation of this backpressure from the Throttle transform to the 
>>> sources as mentioned (of course this is only for the streaming case).
>>>
>>> Is there anything missing in such definition that would still require 
>>> splitting the timers into two distinct features?
>>>
>>>  Jan
>>>
>>> On 2/26/24 21:22, Kenneth Knowles wrote:
>>>
>>> Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.
>>>
>>> OutputTime is always an event time timestamp so it isn't even allowed to be 
>>> set outside the window (or you'd end up with an element assigned to a 
>>> window that it isn't within, since OutputTime essentially represents 
>>> reserving the right to output an element with that timestamp)
>>>
>>> Kenn
>>>
>>> On Mon, Feb 26, 2024 at 3:19 PM Robert Burke  wrote:
>>>>
>>>> Agreed that a retroactive behavior change would be bad, even if tied to a 
>>>> beam version change. I agree that it meshes well with the general theme of 
>>>> State & Timers exposing underlying primitives for implementing Windowing 
>>>> and similar. I'd say the distinction between the two might be additional 
>>>> complexity for users to grok, and would need to be documented well, as 
>>>> both operate in the ProcessingTime domain, but differently.
>>>>
>>>> What to call this new timer then? DelayTimer?
>>>>
>>>> "A DelayTimer sets an instant in ProcessingTime at which point 
>>>> computations can continue. Runners will prevent the EventTimer watermark 
>>>> from advancing past the set OutputTime until Processing Time has advanced 
>>>> to at least the provided instant to execute the timers callback. This can 
>>>> be used to allow the runner to constrain pipeline throughput with user 
>>>> guidance."
>>>>
>>>> I'd probably add that a timer with an output time outside of the window 
>>>> would not be guaranteed to fire, and that OnWindowExpiry is the correct 
>>>> way to ensure cleanup occurs.
>>>>
>>>> No solution to the Looping Timers on Drain problem here, but i think 
>>>> that's ultimately an orthogonal discussion, and will restrain my thoughts 
>>>> on that for now.
>>>>
>>>> This isn't a proposal, but exploring the solution

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
>
> Pulling out focus points:
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
> > I can't act on something yet [...] but I expect to be able to [...] at some 
> > time in the processing-time future.
>
> I like this as a clear and internally-consistent feature description. It 
> describes ProcessContinuation and those timers which serve the same purpose 
> as ProcessContinuation.
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
> > I can't think of a batch or streaming scenario where it would be correct to 
> > not wait at least that long
>
> The main reason we created timers: to take action in the absence of data. The 
> archetypal use case for processing time timers was/is "flush data from state 
> if it has been sitting there too long". For this use case, the right behavior 
> for batch is to skip the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
> > It doesn't require a new primitive.
>
> IMO what's being proposed *is* a new primitive. I think it is a good 
> primitive. It is the underlying primitive to ProcessContinuation. It would be 
> user-friendly as a kind of timer. But if we made this the behavior of 
> processing time timers retroactively, it would break everyone using them to 
> flush data who is also reprocessing data.
>
> There's two very different use cases ("I need to wait, and block data" vs "I 
> want to act without data, aka NOT wait for data") and I think we should serve 
> both of them, but it doesn't have to be with the same low-level feature.
>
> Kenn
>
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>> >
>> > While I'm currently on the other side of the fence, I would not be against 
>> > changing/requiring the semantics of ProcessingTime constructs to be "must 
>> > wait and execute" as such a solution, and enables the Proposed "batch" 
>> > process continuation throttling mechanism to work as hypothesized for both 
>> > "batch" and "streaming" execution.
>> >
>> > There's a lot to like, as it leans Beam further into the unification of 
>> > Batch and Stream, with one fewer exception (eg. unifies timer experience 
>> > further). It doesn't require a new primitive. It probably matches more 
>> > with user expectations anyway.
>> >
>> > It does cause looping timer execution with processing time to be a problem 
>> > for Drains however.
>>
>> I think we have a problem with looping timers plus drain (a mostly
>> streaming idea anyway) regardless.
>>
>> > I'd argue though that in the case of a drain, we could updated the 
>> > semantics as "move watermark to infinity"  "existing timers are executed, 
>> > but new timers are ignored",
>>
>> I don't like the idea of dropping timers for drain. I think correct
>> handling here requires user visibility into whether a pipeline is
>> draining or not.
>>
>> > and ensure/and update the requirements around OnWindowExpiration callbacks 
>> > to be a bit more insistent on being implemented for correct execution, 
>> > which is currently the only "hard" signal to the SDK side that the 
>> > window's work is guaranteed to be over, and remaining state needs to be 
>> > addressed by the transform or be garbage collected. This remains critical 
>> > for developing a good pattern for ProcessingTime timers within a Global 
>> > Window too.
>>
>> +1
>>
>> >
>> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
>> > > Thanks for bringing this up.
>> > >
>> > > My position is that both batch and streaming should wait for
>> > > processing time timers, according to local time (with the exception of
>> > > tests that can accelerate this via faked clocks).
>> > >
>> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
>> > > isomorphic, and can be implemented in terms of each other (at least 

Re: Throttle PTransform

2024-02-23 Thread Robert Bradshaw via dev
On Thu, Feb 22, 2024 at 10:16 AM Robert Bradshaw  wrote:
>
> On Thu, Feb 22, 2024 at 9:37 AM Reuven Lax via dev  
> wrote:
> >
> > On Thu, Feb 22, 2024 at 9:26 AM Kenneth Knowles  wrote:
> >>
> >> Wow I love your input Reuven. Of course "the source" that you are applying 
> >> backpressure to is often a runner's shuffle so it may be state anyhow, but 
> >> it is good to give the runner the choice of how to figure that out and 
> >> maybe chain backpressure further.
> >
> >
> > Sort of - however most (streaming) runners apply backpressure through 
> > shuffle as well. This means that while some amount of data will accumulate 
> > in shuffle, eventually the backpressure will push back to the source. 
> > Caveat of course is that this is mostly true for streaming runners, not 
> > batch runners.
>
> For batch it's still preferable to keep the data upstream in shuffle
> (which has less size limitations) than state (which must reside in
> worker memory, though only one key at a time).

And for drain (or even cancel), it's preferable to have as much as
possible upstream in the source than sitting in state.


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Bradshaw via dev
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>
> While I'm currently on the other side of the fence, I would not be against 
> changing/requiring the semantics of ProcessingTime constructs to be "must 
> wait and execute" as such a solution, and enables the Proposed "batch" 
> process continuation throttling mechanism to work as hypothesized for both 
> "batch" and "streaming" execution.
>
> There's a lot to like, as it leans Beam further into the unification of Batch 
> and Stream, with one fewer exception (eg. unifies timer experience further). 
> It doesn't require a new primitive. It probably matches more with user 
> expectations anyway.
>
> It does cause looping timer execution with processing time to be a problem 
> for Drains however.

I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.

> I'd argue though that in the case of a drain, we could updated the semantics 
> as "move watermark to infinity"  "existing timers are executed, but new 
> timers are ignored",

I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.

> and ensure/and update the requirements around OnWindowExpiration callbacks to 
> be a bit more insistent on being implemented for correct execution, which is 
> currently the only "hard" signal to the SDK side that the window's work is 
> guaranteed to be over, and remaining state needs to be addressed by the 
> transform or be garbage collected. This remains critical for developing a 
> good pattern for ProcessingTime timers within a Global Window too.

+1

>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming should wait for
> > processing time timers, according to local time (with the exception of
> > tests that can accelerate this via faked clocks).
> >
> > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms of each other (at least in
> > one direction, and likely the other). Both are an indication that I
> > can't act on something yet due to external constraints (e.g. not all
> > the data has been published, or I lack sufficient capacity/quota to
> > push things downstream) but I expect to be able to (or at least would
> > like to check again) at some time in the processing-time future. I
> > can't think of a batch or streaming scenario where it would be correct
> > to not wait at least that long (even in batch inputs, e.g. suppose I'm
> > tailing logs and was eagerly started before they were fully written,
> > or waiting for some kind of (non-data-dependent) quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
> > >
> > > For me it always helps to seek analogy in our physical reality. Stream
> > > processing actually has quite a good analogy for both event-time and
> > > processing-time - the simplest model for this being relativity theory.
> > > Event-time is the time at which events occur _at distant locations_. Due
> > > to finite and invariant speed of light (which is actually really
> > > involved in the explanation why any stream processing is inevitably
> > > unordered) these events are observed (processed) at different times
> > > (processing time, different for different observers). It is perfectly
> > > possible for an observer to observe events at a rate that is higher than
> > > one second per second. This also happens in reality for observers that
> > > travel at relativistic speeds (which might be an analogy for fast -
> > > batch - (re)processing). Besides the invariant speed, there is also
> > > another invariant - local clock (wall time) always ticks exactly at the
> > > rate of one second per second, no matter what. It is not possible to
> > > "move faster or slower" through (local) time.
> > >
> > > In my understanding the reason why we do not put any guarantees or
> > > bounds on the delay of firing processing time timers is purely technical
> > > - the processing is (per key) single-threaded, thus any timer has to
> > > wait before any element processing finishes. This is only consequence of
> > > a technical solution, not something fundamental.
> > >
> > > Having said that, my point is that according to the above analogy, it
> > > should be perfectly fine to fire processing time timers in bat

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Bradshaw via dev
Thanks for bringing this up.

My position is that both batch and streaming should wait for
processing time timers, according to local time (with the exception of
tests that can accelerate this via faked clocks).

Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
isomorphic, and can be implemented in terms of each other (at least in
one direction, and likely the other). Both are an indication that I
can't act on something yet due to external constraints (e.g. not all
the data has been published, or I lack sufficient capacity/quota to
push things downstream) but I expect to be able to (or at least would
like to check again) at some time in the processing-time future. I
can't think of a batch or streaming scenario where it would be correct
to not wait at least that long (even in batch inputs, e.g. suppose I'm
tailing logs and was eagerly started before they were fully written,
or waiting for some kind of (non-data-dependent) quiessence or other
operation to finish).


On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
>
> For me it always helps to seek analogy in our physical reality. Stream
> processing actually has quite a good analogy for both event-time and
> processing-time - the simplest model for this being relativity theory.
> Event-time is the time at which events occur _at distant locations_. Due
> to finite and invariant speed of light (which is actually really
> involved in the explanation why any stream processing is inevitably
> unordered) these events are observed (processed) at different times
> (processing time, different for different observers). It is perfectly
> possible for an observer to observe events at a rate that is higher than
> one second per second. This also happens in reality for observers that
> travel at relativistic speeds (which might be an analogy for fast -
> batch - (re)processing). Besides the invariant speed, there is also
> another invariant - local clock (wall time) always ticks exactly at the
> rate of one second per second, no matter what. It is not possible to
> "move faster or slower" through (local) time.
>
> In my understanding the reason why we do not put any guarantees or
> bounds on the delay of firing processing time timers is purely technical
> - the processing is (per key) single-threaded, thus any timer has to
> wait before any element processing finishes. This is only consequence of
> a technical solution, not something fundamental.
>
> Having said that, my point is that according to the above analogy, it
> should be perfectly fine to fire processing time timers in batch based
> on (local wall) time only. There should be no way of manipulating this
> local time (excluding tests). Watermarks should be affected the same way
> as any buffering in a state that would happen in a stateful DoFn (i.e.
> set timer holds output watermark). We should probably pay attention to
> looping timers, but it seems possible to define a valid stopping
> condition (input watermark at infinity).
>
>   Jan
>
> On 2/22/24 19:50, Kenneth Knowles wrote:
> > Forking this thread.
> >
> > The state of processing time timers in this mode of processing is not
> > satisfactory and is discussed a lot but we should make everything
> > explicit.
> >
> > Currently, a state and timer DoFn has a number of logical watermarks:
> > (apologies for fixed width not coming through in email lists). Treat
> > timers as a back edge.
> >
> > input --(A)(C)--> ParDo(DoFn) (D)---> output
> > ^  |
> > |--(B)-|
> >timers
> >
> > (A) Input Element watermark: this is the watermark that promises there
> > is no incoming element with a timestamp earlier than it. Each input
> > element's timestamp holds this watermark. Note that *event time timers
> > firing is according to this watermark*. But a runner commits changes
> > to this watermark *whenever it wants*, in a way that can be
> > consistent. So the runner can absolute process *all* the elements
> > before advancing the watermark (A), and only afterwards start firing
> > timers.
> >
> > (B) Timer watermark: this is a watermark that promises no timer is set
> > with an output timestamp earlier than it. Each timer that has an
> > output timestamp holds this watermark. Note that timers can set new
> > timers, indefinitely, so this may never reach infinity even in a drain
> > scenario.
> >
> > (C) (derived) total input watermark: this is a watermark that is the
> > minimum of the two above, and ensures that all state for the DoFn for
> > expired windows can be GCd after calling @OnWindowExpiration.
> >
> > (D) output watermark: this is a promise that the DoFn will not output
> > earlier than the watermark. It is held by the total input watermark.
> >
> > So a any timer, processing or not, holds the total input watermark
> > which prevents window GC, hence the timer must be fired. You can set
> > timers without a timestamp and they will not hold (B) hence not 

Re: Throttle PTransform

2024-02-22 Thread Robert Bradshaw via dev
On Thu, Feb 22, 2024 at 9:37 AM Reuven Lax via dev  wrote:
>
> On Thu, Feb 22, 2024 at 9:26 AM Kenneth Knowles  wrote:
>>
>> Wow I love your input Reuven. Of course "the source" that you are applying 
>> backpressure to is often a runner's shuffle so it may be state anyhow, but 
>> it is good to give the runner the choice of how to figure that out and maybe 
>> chain backpressure further.
>
>
> Sort of - however most (streaming) runners apply backpressure through shuffle 
> as well. This means that while some amount of data will accumulate in 
> shuffle, eventually the backpressure will push back to the source. Caveat of 
> course is that this is mostly true for streaming runners, not batch runners.

For batch it's still preferable to keep the data upstream in shuffle
(which has less size limitations) than state (which must reside in
worker memory, though only one key at a time).


Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-22 Thread Robert Bradshaw via dev
On Wed, Feb 21, 2024 at 11:58 PM Jan Lukavský  wrote:
>
> Reasons we use Java serialization are not fundamental, probably only 
> historical. Thinking about it, yes, there is lucky coincidence that we 
> currently have to change the serialization because of Flink 1.17 support. 
> Flink 1.17 actually removes the legacy java serialization from Flink and 
> enforces custom serialization. Therefore, we need to introduce an upgrade 
> compatible change of serialization to support Flink 1.17. This is already 
> implemented in [1]. The PR can go further, though. We can replace Java 
> serialization of Coder in the TypeSerializerSnapshot and use the portable 
> representation of Coder (which will still use Java serialization in some 
> cases, but might avoid it at least for well-known coders, moreover Coders 
> should be more upgrade-stable classes).
>
> I'll try to restore the SerializablePipelineOptions (copy) in 
> FlinkRunner only and rework the serialization in a more stable way (at least 
> avoid serializing the CoderTypeSerializer, which references the 
> SerializablePipelineOptions).

Thanks!

If other runners use this (SerializablePipelineOptions seems like it's
explicitly created for this purpose) we could consider putting the
copy into core rather than just the Flink packages.

> I created [2] and marked it as blocker for 2.55.0 release, because otherwise 
> we would break the upgrade.
>
> Thanks for the discussion, it helped a lot.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/30197
>
> [2] https://github.com/apache/beam/issues/30385
>
> On 2/21/24 20:33, Kenneth Knowles wrote:
>
> Yea I think we should restore the necessary classes but also fix the 
> FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev  
> wrote:
>>
>> Is there a fundamental reason we serialize java classes into Flink 
>> savepoints.
>>
>> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev 
>>  wrote:
>>>
>>> We could consider merging the gradle targets without renaming the
>>> classpaths as an intermediate step.
>>>
>>> Optimistically, perhaps there's a small number of classes that we need
>>> to preserve (e.g. SerializablePipelineOptions looks like it was
>>> something specifically intended to be serialized; maybe that an a
>>> handful of others (that implement Serializable) could be left in their
>>> original packages for backwards compatibility reasons?
>>>
>>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský  wrote:
>>> >
>>> > Hi,
>>> >
>>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>>> > Flink 1.17. There is some change regarding serialization needed for
>>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>>> > core-construction-java into SDK, which resulted in some classes being
>>> > repackaged. Unfortunately, we serialize some classes into Flink's
>>> > check/savepoints. The renaming of the class therefore ends with the
>>> > following exception trying to restore from the savepoint:
>>> >
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>>> >  at 
>>> > java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> >  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> >  at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>>> >  at
>>> > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> >  at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>>> >  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> >  at
>>> > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>>> >  at java.base/java.lang.Class.forName0(Native Method)
>>> >  at java.base/java.lang.Class.forName(Class.java:398)
>>> >  at
>>> > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> >  at
>>> > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInpu

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-21 Thread Robert Bradshaw via dev
We could consider merging the gradle targets without renaming the
classpaths as an intermediate step.

Optimistically, perhaps there's a small number of classes that we need
to preserve (e.g. SerializablePipelineOptions looks like it was
something specifically intended to be serialized; maybe that an a
handful of others (that implement Serializable) could be left in their
original packages for backwards compatibility reasons?

On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský  wrote:
>
> Hi,
>
> while implementing FlinkRunner for Flink 1.17 I tried to verify that a
> running Pipeline is able to successfully upgrade from Flink 1.16 to
> Flink 1.17. There is some change regarding serialization needed for
> Flink 1.17, so this was a concern. Unfortunately recently we merged
> core-construction-java into SDK, which resulted in some classes being
> repackaged. Unfortunately, we serialize some classes into Flink's
> check/savepoints. The renaming of the class therefore ends with the
> following exception trying to restore from the savepoint:
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.beam.runners.core.construction.SerializablePipelineOptions
>  at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>  at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>  at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>  at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>  at
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>  at java.base/java.lang.Class.forName0(Native Method)
>  at java.base/java.lang.Class.forName(Class.java:398)
>  at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>  at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>
>
> This means that no Pipeline will be able to successfully upgrade from
> version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
> restarted from scratch). I wanted to know how the community would feel
> about that, this consequence probably was not clear when we merged the
> artifacts. The only option would be to revert the merge and then try to
> figure out how to avoid Java serialization in Flink's savepoints. That
> would definitely be costly in terms of implementation and even more to
> provide ways to transfer old savepoints to the new format (can be
> possible using state processor API). I'm aware that Beam provides no
> general guarantees about the upgrade compatibility, so it might be fine
> to just ignore this, I just wanted to shout this out loud so that we can
> make a deliberate decision.
>
> Best,
>
>   Jan
>


Re: Throttle PTransform

2024-02-21 Thread Robert Bradshaw via dev
I like the idea of pushing back to the source much better than
unboundedly buffering things in state. I was trying to think of how to
just slow things down and one problem is that while we can easily
control the number of keys, it's much harder to control (or even
detect) the number of parallel threads at any given point in time (for
which keys is simply an upper bound, especially in batch).

On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax  wrote:
>
> Agreed, that event-time throttling doesn't make sense here. In theory 
> processing-time timers have no SLA - i.e. their firing might be delayed - so 
> batch runners aren't violating the model by firing them all at the end; 
> however it does make processing time timers less useful in batch, as we see 
> here.
>
> Personally, I'm not sure I would use state and timers to implement this, and 
> I definitely wouldn't create this many keys. A couple of reasons for this:
>   1. If a pipeline is receiving input faster than the throttle rate, the 
> proposed technique would shift all those elements into the DoFn's state which 
> will keep growing indefinitely. Generally we would prefer to leave that 
> backlog in the source instead of copying it into DoFn state.
>   2. In my experience with throttling, having too much parallelism is 
> problematic. The issue is that there is some error involved whenever you 
> throttle, and this error can accumulate across many shards (and when I've 
> done this sort of thing before, I found that the error was often biased in 
> one direction). If targeting 100,000 records/sec, this  approach (if I 
> understand it correctly) would create 100,000 shards and throttle them each 
> to one element/sec. I doubt this will actually result in anything close to 
> desired throttling.
>   3. Very commonly, the request is to throttle based on bytes/sec, not 
> events/sec. Anything we build should be easily extensible to bytes/sec.
>
> What I would suggest (and what Beam users have often done in the past) would 
> be to bucket the PCollection into N buckets where N is generally smallish 
> (100 buckets, 1000 buckets, depending on the expected throughput); runners 
> that support autosharding (such as Dataflow) can automatically choose N. Each 
> shard then throttles its output to rate/N. Keeping N no larger than necessary 
> minimizes the error introduced into throttling.
>
> We also don't necessarily need state/timers here - each shard is processed on 
> a single thread, so those threads can simply throttle calls to 
> OutputReceiver.output. This way if the pipeline is exceeding the threshold, 
> backpressure will tend to simply leave excess data in the source. This also 
> is a simpler design than the proposed one.
>
> A more sophisticated design might combine elements of both - buffering a 
> bounded amount of data in state when the threshold is exceeded, but severely 
> limiting the state size. However I wouldn't start here - we would want to 
> build the simpler implementation first and see how it performs.
>
> On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev  
> wrote:
>>
>> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
>> >
>> > Hi,
>> >
>> > I have left a note regarding the proposed splitting of batch and
>> > streaming expansion of this transform. In general, a need for such split
>> > triggers doubts in me. This signals that either
>> >
>> >   a) the transform does something is should not, or
>> >
>> >   b) Beam model is not complete in terms of being "unified"
>> >
>> > The problem that is described in the document is that in the batch case
>> > timers are not fired appropriately.
>>
>> +1. The underlying flaw is that processing time timers are not handled
>> correctly in batch, but should be (even if it means keeping workers
>> idle?). We should fix this.
>>
>> > This is actually on of the
>> > motivations that led to introduction of @RequiresTimeSortedInput
>> > annotation and, though mentioned years ago as a question, I do not
>> > remember what arguments were used against enforcing sorting inputs by
>> > timestamp in the batch stateful DoFn as a requirement in the model. That
>> > would enable the appropriate firing of timers while preserving the batch
>> > invariant which is there are no late data allowed. IIRC there are
>> > runners that do this sorting by default (at least the sorting, not sure
>> > about the timers, but once inputs are sorted, firing timers is simple).
>> >
>> > A different question is if this particular transform should maybe fire
>> > not by event time, but rather processing time?
>>
>

Re: Throttle PTransform

2024-02-21 Thread Robert Bradshaw via dev
On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
>
> Hi,
>
> I have left a note regarding the proposed splitting of batch and
> streaming expansion of this transform. In general, a need for such split
> triggers doubts in me. This signals that either
>
>   a) the transform does something is should not, or
>
>   b) Beam model is not complete in terms of being "unified"
>
> The problem that is described in the document is that in the batch case
> timers are not fired appropriately.

+1. The underlying flaw is that processing time timers are not handled
correctly in batch, but should be (even if it means keeping workers
idle?). We should fix this.

> This is actually on of the
> motivations that led to introduction of @RequiresTimeSortedInput
> annotation and, though mentioned years ago as a question, I do not
> remember what arguments were used against enforcing sorting inputs by
> timestamp in the batch stateful DoFn as a requirement in the model. That
> would enable the appropriate firing of timers while preserving the batch
> invariant which is there are no late data allowed. IIRC there are
> runners that do this sorting by default (at least the sorting, not sure
> about the timers, but once inputs are sorted, firing timers is simple).
>
> A different question is if this particular transform should maybe fire
> not by event time, but rather processing time?

Yeah, I was reading all of these as processing time. Throttling by
event time doesn't make much sense.

> On 2/21/24 03:00, Robert Burke wrote:
> > Thanks for the design Damon! And thanks for collaborating with me on 
> > getting a high level textual description of the key implementation idea 
> > down in writing. I think the solution is pretty elegant.
> >
> > I do have concerns about how different Runners might handle 
> > ProcessContinuations for the Bounded Input case. I know Dataflow famously 
> > has two different execution modes under the hood, but I agree with the 
> > principle that ProcessContinuation.Resume should largely be in line with 
> > the expected delay, though it's by no means guaranteed AFAIK.
> >
> > We should also ensure this is linked from 
> > https://s.apache.org/beam-design-docs if not already.
> >
> > Robert Burke
> > Beam Go Busybody
> >
> > On 2024/02/20 14:00:00 Damon Douglas wrote:
> >> Hello Everyone,
> >>
> >> The following describes a Throttle PTransform that holds element throughput
> >> to minimize downstream API overusage. Thank you for reading and your
> >> valuable input.
> >>
> >> https://s.apache.org/beam-throttle-transform
> >>
> >> Best,
> >>
> >> Damon
> >>


Re: [API PROPOSAL] PTransform.getURN, toProto, etc, for Java

2024-02-15 Thread Robert Bradshaw via dev
On Wed, Feb 14, 2024 at 10:28 AM Kenneth Knowles  wrote:
>
> Hi all,
>
> TL;DR I want to add some API like PTransform.getURN, toProto and fromProto, 
> etc. to the Java SDK. I want to do this so that making a PTransform support 
> portability is a natural part of writing the transform and not a totally 
> separate thing with tons of boilerplate.
>
> What do you think?

Huge +1 to this direction.

IMHO one of the most fundamental things about Beam is its model.
Originally this was only expressed in a specific SDK (Java) and then
got ported to others, but now that we have portability it's expressed
in a language-independent way.

The fact that we keep these separate in Java is not buying us
anything, and causes a huge amount of boilerplate that'd be great to
remove, as well as making the essential model more front-and-center.

> I think a particular API can be sorted out most easily in code (which I will 
> prepare after gathering some feedback).
>
> We already have all the translation logic written, and porting a couple 
> transforms to it will ensure the API has everything we need. We can refer to 
> Python and Go for API ideas as well.
>
> Lots of context below, but you can skip it...
>
> -
>
> When we first created the portability framework, we wanted the SDKs to be 
> "standalone" and not depend on portability. We wanted portability to be an 
> optional plugin that users could opt in to. That is totally the opposite now. 
> We want portability to be the main place where Beam is defined, and then SDKs 
> make that available in language idiomatic ways.
>
> Also when we first created the framework, we were experimenting with 
> different serialization approaches and we wanted to be independent of 
> protobuf and gRPC if we could. But now we are pretty committed and it would 
> be a huge lift to use anything else.
>
> Finally, at the time we created the portability framework, we designed it to 
> allow composites to have URNs and well-defined specs, rather than just be 
> language-specific subgraphs, but we didn't really plan to make this easy.
>
> For all of the above, most users depend on portability and on proto. So 
> separating them is not useful and just creates LOTS of boilerplate and 
> friction for making new well-defined transforms.
>
> Kenn


Re: [VOTE] Release 2.54.0, release candidate #2

2024-02-14 Thread Robert Bradshaw via dev
+1 (binding)

We've done the validation we can for now, let's not hold up the
release any longer.

(For those curious, there may be a brief period of time where Dataflow
pipelines with 2.54 still default to Runner V1 in some regions as
things roll out, but we expect this to be fully resolved next week.)

On Fri, Feb 9, 2024 at 6:28 PM Robert Burke  wrote:
>
> I can agree to that Robert Bradshaw. Thank you for letting the community know.
>
> (Disclaimer: I am on the Dataflow team myself, but do try to keep my hats 
> separated when I'm release manager).
>
> It would be bad for Beam users who use Dataflow to try to use the release but 
> be unaware of the switch. I'm in favour of the path of least user issues, 
> wherever they're cropping up from.
>
> As a heads up, I'll likely not address this thread until Wednesday morning 
> (or if there's a sooner update) as a result of this request.
>
> Separately:
> + 1 (binding)
>  I've done a few of the quickstarts from the validation sheets and updated my 
> own Beam Go code. Other than a non-blocking update to the Go wordcount 
> quickstart, I didn't run into any issues.
>
> Robert Burke
> Beam 2.54.0 Release Manager
>
> On 2024/02/10 01:41:12 Robert Bradshaw via dev wrote:
> > I validated that the release artifacts are all correct, tested some simple
> > Python and Yaml pipelines. Everything is looking good so far.
> >
> > However, could I ask that you hold this vote open a little longer? We've
> > got some Dataflow service side changes that relate to 2.54 being the first
> > release where Runner v2 is the default for Java (big change on our side),
> > and could use a bit of additional time to verify we have everything lined
> > up correctly. We should be able to finish the validation early/mid next
> > week at the latest.
> >
> > - Robert
> >
> >
> > On Fri, Feb 9, 2024 at 2:57 PM Yi Hu via dev  wrote:
> >
> > > Also tested with GCP IO performance benchmark [1]. Passed other than
> > > SpannerIO where the benchmark failed due to issues in the test suite 
> > > itself
> > > [2], not related to Beam.
> > >
> > > +1 but I had voted for another validation suite before for this RC
> > >
> > > [1]
> > > https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it/google-cloud-platform
> > > [2] https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/1326
> > >
> > > On Fri, Feb 9, 2024 at 9:43 AM Valentyn Tymofieiev via dev <
> > > dev@beam.apache.org> wrote:
> > >
> > >> +1.
> > >>
> > >> Checked postcommit test results for Python SDK, and exercised a couple of
> > >> Datadow scenarios.
> > >>
> > >> On Thu, Feb 8, 2024, 14:07 Svetak Sundhar via dev 
> > >> wrote:
> > >>
> > >>> +1 (Non-Binding)
> > >>>
> > >>> Tested with Python SDK on DirectRunner and Dataflow Runner
> > >>>
> > >>>
> > >>> Svetak Sundhar
> > >>>
> > >>>   Data Engineer
> > >>> s vetaksund...@google.com
> > >>>
> > >>>
> > >>>
> > >>> On Thu, Feb 8, 2024 at 12:45 PM Chamikara Jayalath via dev <
> > >>> dev@beam.apache.org> wrote:
> > >>>
> > >>>> +1 (binding)
> > >>>>
> > >>>> Tried out Java/Python multi-lang jobs and upgrading BQ/Kafka transforms
> > >>>> from 2.53.0 to 2.54.0 using the Transform Service.
> > >>>>
> > >>>> Thanks,
> > >>>> Cham
> > >>>>
> > >>>> On Wed, Feb 7, 2024 at 5:52 PM XQ Hu via dev 
> > >>>> wrote:
> > >>>>
> > >>>>> +1 (non-binding)
> > >>>>>
> > >>>>> Validated with a simple RunInference Python pipeline:
> > >>>>> https://github.com/google/dataflow-ml-starter/actions/runs/7821639833/job/21339032997
> > >>>>>
> > >>>>> On Wed, Feb 7, 2024 at 7:10 PM Yi Hu via dev 
> > >>>>> wrote:
> > >>>>>
> > >>>>>> +1 (non-binding)
> > >>>>>>
> > >>>>>> Validated with Dataflow Template:
> > >>>>>> https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1317
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>>
> > >>>>>&

Re: [VOTE] Release 2.54.0, release candidate #2

2024-02-09 Thread Robert Bradshaw via dev
I validated that the release artifacts are all correct, tested some simple
Python and Yaml pipelines. Everything is looking good so far.

However, could I ask that you hold this vote open a little longer? We've
got some Dataflow service side changes that relate to 2.54 being the first
release where Runner v2 is the default for Java (big change on our side),
and could use a bit of additional time to verify we have everything lined
up correctly. We should be able to finish the validation early/mid next
week at the latest.

- Robert


On Fri, Feb 9, 2024 at 2:57 PM Yi Hu via dev  wrote:

> Also tested with GCP IO performance benchmark [1]. Passed other than
> SpannerIO where the benchmark failed due to issues in the test suite itself
> [2], not related to Beam.
>
> +1 but I had voted for another validation suite before for this RC
>
> [1]
> https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it/google-cloud-platform
> [2] https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/1326
>
> On Fri, Feb 9, 2024 at 9:43 AM Valentyn Tymofieiev via dev <
> dev@beam.apache.org> wrote:
>
>> +1.
>>
>> Checked postcommit test results for Python SDK, and exercised a couple of
>> Datadow scenarios.
>>
>> On Thu, Feb 8, 2024, 14:07 Svetak Sundhar via dev 
>> wrote:
>>
>>> +1 (Non-Binding)
>>>
>>> Tested with Python SDK on DirectRunner and Dataflow Runner
>>>
>>>
>>> Svetak Sundhar
>>>
>>>   Data Engineer
>>> s vetaksund...@google.com
>>>
>>>
>>>
>>> On Thu, Feb 8, 2024 at 12:45 PM Chamikara Jayalath via dev <
>>> dev@beam.apache.org> wrote:
>>>
 +1 (binding)

 Tried out Java/Python multi-lang jobs and upgrading BQ/Kafka transforms
 from 2.53.0 to 2.54.0 using the Transform Service.

 Thanks,
 Cham

 On Wed, Feb 7, 2024 at 5:52 PM XQ Hu via dev 
 wrote:

> +1 (non-binding)
>
> Validated with a simple RunInference Python pipeline:
> https://github.com/google/dataflow-ml-starter/actions/runs/7821639833/job/21339032997
>
> On Wed, Feb 7, 2024 at 7:10 PM Yi Hu via dev 
> wrote:
>
>> +1 (non-binding)
>>
>> Validated with Dataflow Template:
>> https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1317
>>
>> Regards,
>>
>> On Wed, Feb 7, 2024 at 11:18 AM Ritesh Ghorse via dev <
>> dev@beam.apache.org> wrote:
>>
>>> +1 (non-binding)
>>>
>>> Ran a few batch and streaming examples for Python SDK on Dataflow
>>> Runner
>>>
>>> Thanks!
>>>
>>> On Wed, Feb 7, 2024 at 4:08 AM Jan Lukavský  wrote:
>>>
 +1 (binding)

 Validated Java SDK with Flink runner.

  Jan
 On 2/7/24 06:23, Robert Burke via dev wrote:

 Hi everyone,
 Please review and vote on the release candidate #2 for the version
 2.54.0,
 as follows:
 [ ] +1, Approve the release
 [ ] -1, Do not approve the release (please provide specific
 comments)


 Reviewers are encouraged to test their own use cases with the
 release
 candidate, and vote +1 if
 no issues are found. Only PMC member votes will count towards the
 final
 vote, but votes from all
 community members is encouraged and helpful for finding
 regressions; you
 can either test your own
 use cases [13] or use cases from the validation sheet [10].

 The complete staging area is available for your review, which
 includes:
 * GitHub Release notes [1],
 * the official Apache source release to be deployed to
 dist.apache.org [2],
 which is signed with the key with fingerprint D20316F712213422 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "v2.54.0-RC2" [5],
 * website pull request listing the release [6], the blog post [6],
 and
 publishing the API reference manual [7].
 * Python artifacts are deployed along with the source release to the
 dist.apache.org [2] and PyPI[8].
 * Go artifacts and documentation are available at pkg.go.dev [9]
 * Validation sheet with a tab for 2.54.0 release to help with
 validation
 [10].
 * Docker images published to Docker Hub [11].
 * PR to run tests against release branch [12].

 The vote will be open for at least 72 hours. It is adopted by
 majority
 approval, with at least 3 PMC affirmative votes.

 For guidelines on how to try the release in your projects, check
 out our RC
 testing guide [13].

 Thanks,
 Robert Burke
 Beam 2.54.0 Release Manager

 [1] https://github.com/apache/beam/milestone/18?closed=1
 [2] https://dist.apache.org/repos/dist/dev/beam/2.54.0/
 [3] 

Re: [python] Why CombinePerKey(lambda vs: None)?

2024-01-26 Thread Robert Bradshaw via dev
On Fri, Jan 26, 2024 at 8:43 AM Joey Tran  wrote:
>
> Hmm, I think I might still be missing something. CombinePerKey is made up of 
> "GBK() | CombineValues". Pulling it out into the Distinct, Distinct looks 
> like:
>
> def Distinct(pcoll):  # pylint: disable=invalid-name
>   """Produces a PCollection containing distinct elements of a PCollection."""
>   return (
>  pcoll
>  | 'ToPairs' >> Map(lambda v: (v, None))
>   | 'Group' >> GroupByKey()
>   | 'CombineValues >> CombineValues(lambda vs: None)
>   | 'Distinct' >> Keys())
>
> Does the combiner lifting somehow make the GroupByKey operation more 
> efficient despite coming after it? My intuition would suggest that we could 
> just remove the `CombineValues` altogether

The key property of CombineFns is that they are commutative and
associative which permits an optimization called combiner lifting.
Specifically, the operation

GroupByKey() | CombineValues(C)

re-written into

PartialCombineUsingLocalBufferMap(C) | GroupByKey() | FinalCombine(C)

that pretty much every runner supports (going back to the days of the
original MapReduce), which is what can make this so much more
efficient.

https://github.com/apache/beam/blob/release-2.21.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L669

I am, unfortunately, coming up short in finding good documentation on
this (Apache Beam specific or otherwise).


> On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev 
>  wrote:
>>
>> This is because it allows us to do some of the deduplication before
>> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
>> worker and [B, B, B, B, C, C] on another. Rather than passing all that
>> data through the GroupByKey (which involves (relatively) expensive
>> materialization and cross-machine traffic, with this form the first
>> worker will only emit [A, B] and the second [B, C] and only the B
>> needs to be deduplicated post-shuffle.
>>
>> Wouldn't hurt to have a comment to that effect there.
>>
>> https://beam.apache.org/documentation/programming-guide/#combine
>>
>> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran  wrote:
>> >
>> > Hey all,
>> >
>> > I was poking around and looking at `Distinct` and was confused about why 
>> > it was implemented the way it was.
>> >
>> > Reproduced here:
>> > @ptransform_fn
>> > @typehints.with_input_types(T)
>> > @typehints.with_output_types(T)
>> > def Distinct(pcoll):  # pylint: disable=invalid-name
>> >   """Produces a PCollection containing distinct elements of a 
>> > PCollection."""
>> >   return (
>> >   pcoll
>> >   | 'ToPairs' >> Map(lambda v: (v, None))
>> >   | 'Group' >> CombinePerKey(lambda vs: None)
>> >   | 'Distinct' >> Keys())
>> >
>> > Could anyone clarify why we'd use a `CombinePerKey` instead of just using 
>> > `GroupByKey`?
>> >
>> > Cheers,
>> > Joey


Re: [python] Why CombinePerKey(lambda vs: None)?

2024-01-26 Thread Robert Bradshaw via dev
This is because it allows us to do some of the deduplication before
shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
worker and [B, B, B, B, C, C] on another. Rather than passing all that
data through the GroupByKey (which involves (relatively) expensive
materialization and cross-machine traffic, with this form the first
worker will only emit [A, B] and the second [B, C] and only the B
needs to be deduplicated post-shuffle.

Wouldn't hurt to have a comment to that effect there.

https://beam.apache.org/documentation/programming-guide/#combine

On Fri, Jan 26, 2024 at 8:22 AM Joey Tran  wrote:
>
> Hey all,
>
> I was poking around and looking at `Distinct` and was confused about why it 
> was implemented the way it was.
>
> Reproduced here:
> @ptransform_fn
> @typehints.with_input_types(T)
> @typehints.with_output_types(T)
> def Distinct(pcoll):  # pylint: disable=invalid-name
>   """Produces a PCollection containing distinct elements of a PCollection."""
>   return (
>   pcoll
>   | 'ToPairs' >> Map(lambda v: (v, None))
>   | 'Group' >> CombinePerKey(lambda vs: None)
>   | 'Distinct' >> Keys())
>
> Could anyone clarify why we'd use a `CombinePerKey` instead of just using 
> `GroupByKey`?
>
> Cheers,
> Joey


Re: [VOTE] Vendored Dependencies Release

2024-01-19 Thread Robert Bradshaw via dev
Thanks.

+1


On Fri, Jan 19, 2024 at 1:24 PM Yi Hu  wrote:

> The process I have been following is [1]. I have also suggested edits to
> the voting email template to include the self-link. However, does anyone
> can edit this doc so the change can be made? Otherwise we might better to
> migrate this doc to
> https://github.com/apache/beam/tree/master/contributor-docs
>
> [1] https://s.apache.org/beam-release-vendored-artifacts
>
> On Thu, Jan 18, 2024 at 2:56 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Could you explain the process you used to produce these artifacts?
>>
>> On Thu, Jan 18, 2024 at 11:23 AM Kenneth Knowles  wrote:
>>
>>> +1
>>>
>>> On Wed, Jan 17, 2024 at 6:03 PM Yi Hu via dev 
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>>
>>>> Please review the release of the following artifacts that we vendor:
>>>>
>>>>  * beam-vendor-grpc-1_60_1
>>>>
>>>>
>>>> Please review and vote on the release candidate #1 for the version 0.1,
>>>> as follows:
>>>>
>>>> [ ] +1, Approve the release
>>>>
>>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>>
>>>>
>>>> The complete staging area is available for your review, which includes:
>>>>
>>>> * the official Apache source release to be deployed to dist.apache.org
>>>> [1], which is signed with the key with fingerprint 8935B943A188DE65 [2],
>>>>
>>>> * all artifacts to be deployed to the Maven Central Repository [3],
>>>>
>>>> * commit hash "52b4a9cb58e486745ded7d53a5b6e2d2312e9551" [4],
>>>>
>>>> The vote will be open for at least 72 hours. It is adopted by majority
>>>> approval, with at least 3 PMC affirmative votes.
>>>>
>>>> Thanks,
>>>>
>>>> Release Manager
>>>>
>>>> [1] https://dist.apache.org/repos/dist/dev/beam/vendor/
>>>>
>>>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>>>
>>>> [3]
>>>> https://repository.apache.org/content/repositories/orgapachebeam-1366/
>>>>
>>>> [4]
>>>> https://github.com/apache/beam/commits/52b4a9cb58e486745ded7d53a5b6e2d2312e9551/
>>>>
>>>>
>>>> --
>>>>
>>>> Yi Hu, (he/him/his)
>>>>
>>>> Software Engineer
>>>>
>>>>
>>>>


Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Robert Bradshaw via dev
I think this standard design could still be made to work.
Specifically, the graph would contain a DoFn that has the
RequiresTimeSortedInput bit set, and as a single "subtransform" that
has a different DoFn in its spec that does not require this bit to be
set and whose implementation enforces this ordering (say, via state)
before invoking the user's DoFn. This would work fine in Streaming for
any runner, and would work OK for batch as long as the value set for
any key fit into memory (or the batch state implementation spilled to
disk, though that could get really slow). Runners that wanted to do
better (e.g. provide timestamp sorting as part of their batch
grouping, or even internally sort timestamps more efficiently than
could be done via the SDK over the state API) could do so.

For Java, such a wrapper might be a bit messy, but could probably be
hard coded above the ByteBuddy wrappers layer.

TBD how much of our infrastructure assumes ParDo transforms do not
contain subtransforms. (We could also provide a different URN for
RequresTimeSortedInput DoFns whose payload would be the DoFn payload,
rather than setting a bit on the payload itself.) Rather than
introducing nesting, we could implement the AnyOf PTransform that
would present the two implementations as siblings (which could be
useful elsewhere). This can be made backward compatible by providing
one of the alternatives as the composite structure. The primary
hesitation I have here is that it prevents much
introspection/manipulation of the pipeline before the runner
capabilities are know.

What we really want is a way to annotate a DoFn as
RequestsTimeSortedInput, together with a way for the runner to
communicate to the SDK whether or not it was able to honor this
request. That may be a more invasive change to the protocol (e.g.
annotating PCollections with ordering properties, which is where it
belongs[1]). I suppose we could let a runner that supports this
capability strip the RequestsTimeSortedInput bit (or set a new bit),
and SDKs that get unmutated transforms would know they have to do the
sorting themselves.

- Robert

[1] Ordering is an under-defined concept in Beam, but if we're going
to add it my take would be that to do it properly one would want

(1) Annotations on PCollections indicating whether they're unordered
or ordered (by a certain ordering criteria, in this case
timestamp-within-key), which could be largely inferred by
(2) Annotations on PTransforms indicating whether they're
order-creating, order-preserving, or order-requiring (with the default
being unspeciified=order-destroying), again parameterized by an
ordering criteria of some kind, which criteria could for a hierarchy.


On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles  wrote:
>
> In this design space, what we have done in the past is:
>
> 1) ensure that runners all reject pipelines they cannot run correctly
> 2) if there is a default/workaround/slower implementation, provide it as an 
> override
>
> This is largely ignoring portability but I think/hope it will still work. At 
> one time I put some effort into ensuring Java Pipeline objects and proto 
> representations could roundtrip with all the necessary information for 
> pre-portability runners to still work, which is the same prereqs as 
> pre-portable "Override" implementations to still work.
>
> TBH I'm 50/50 on the idea. If something is going to be implemented more 
> slowly or less scalably as a fallback, I think it may be best to simply be 
> upfront about being unable to really run it. It would depend on the 
> situation. For requiring time sorted input, the manual implementation is 
> probably similar to what a streaming runner might do, so it might make sense.
>
> Kenn
>
> On Fri, Jan 19, 2024 at 11:05 AM Robert Burke  wrote:
>>
>> I certainly don't have the deeper java insight here. So one more portable 
>> based reply and then I'll step back on the Java specifics.
>>
>> Portable runners only really have the "unknown Composite" fallback option, 
>> where if the Composite's URN isn't known to the runner, it should use the 
>> subgraph that is being wrapped.
>>
>> I suppose the protocol could be expanded : If a composite transform with a 
>> ParDo payload, and urn has features the runner can't handle, then it could 
>> use the fallback graph as well.
>>
>> The SDK would have then still needed to have construct the fallback graph 
>> into the Pipeline proto. This doesn't sound incompatible with what you've 
>> suggested the Java SDK could do, but it avoids the runner needing to be 
>> aware of a specific implementation requirement around a feature it doesn't 
>> support.  If it has to do something specific to support an SDK specific 
>> mechanism, that's still supporting the feature, but I fear it's not a great 
>> road to tread on for runners to add SDK specific implementation details.
>>
>> If a (portable) runner is going to spend work on doing something to handle 
>> RequiresTimeSortedInput, it's probably 

Re: [VOTE] Vendored Dependencies Release

2024-01-18 Thread Robert Bradshaw via dev
Could you explain the process you used to produce these artifacts?

On Thu, Jan 18, 2024 at 11:23 AM Kenneth Knowles  wrote:

> +1
>
> On Wed, Jan 17, 2024 at 6:03 PM Yi Hu via dev  wrote:
>
>> Hi everyone,
>>
>>
>> Please review the release of the following artifacts that we vendor:
>>
>>  * beam-vendor-grpc-1_60_1
>>
>>
>> Please review and vote on the release candidate #1 for the version 0.1,
>> as follows:
>>
>> [ ] +1, Approve the release
>>
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> The complete staging area is available for your review, which includes:
>>
>> * the official Apache source release to be deployed to dist.apache.org
>> [1], which is signed with the key with fingerprint 8935B943A188DE65 [2],
>>
>> * all artifacts to be deployed to the Maven Central Repository [3],
>>
>> * commit hash "52b4a9cb58e486745ded7d53a5b6e2d2312e9551" [4],
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>>
>> Release Manager
>>
>> [1] https://dist.apache.org/repos/dist/dev/beam/vendor/
>>
>> [2] https://dist.apache.org/repos/dist/release/beam/KEYS
>>
>> [3]
>> https://repository.apache.org/content/repositories/orgapachebeam-1366/
>>
>> [4]
>> https://github.com/apache/beam/commits/52b4a9cb58e486745ded7d53a5b6e2d2312e9551/
>>
>>
>> --
>>
>> Yi Hu, (he/him/his)
>>
>> Software Engineer
>>
>>
>>


Re: [YAML] add timestamp to a bounded PCollection

2024-01-09 Thread Robert Bradshaw via dev
Just created https://github.com/apache/beam/pull/29969

On Mon, Jan 8, 2024 at 2:49 PM Robert Bradshaw  wrote:
>
> This does appear to be a significant missing feature. I'll try to make
> sure something easier gets in by the next release. See also below.
>
> On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
>  wrote:
> >
> > Hi Yarden,
> >
> > Since it's a bounded source you could try with Sql transformation
> > grouping by the timestamp column. Here are some examples of grouping:
> >
> > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
> >
> > However, if you want to add a timestamp column in addition to the
> > original CSV records then, there are multiple ways to achieve that.
> >
> > 1) MapToFields:
> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> > [Your timestamp column could be a callable to get the current
> > timestamp on each record]
> >
> > 2) If you need an extra layer of transformation complexity I would
> > recommend creating a custom transformation:
> >
> > # - type: MyCustomTransform
> > # name: AddDateTimeColumn
> > # config:
> > # prefix: 'whatever'
> >
> > providers:
> > - type: 'javaJar'
> > config:
> > jar: 'gs://path/of/the/java.jar'
> > transforms:
> > MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'
>
> Alternatively you can use PyTransform, if you're more comfortable with
> that by invoking it via its fully qualified name.
>
> pipeline:
>   transforms:
> ...
> - type: MyAssignTimestamps
>   config:
>   kwarg1: ...
>   kwarg2: ...
>
> providers:
>   type:python
>   config:
> packages: ['py_py_package_identifier']
>   transforms:
> MyAssignTimestamps:
> fully_qualified_package.module.AssignTimestampsPTransform
>
>
>
> > Best,
> > Ferran
> >
> > El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> > escribió:
> > >
> > > Hi all,
> > > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > > implement some windowing logic to it.
> > > Was wondering what is the right way to add timestamps to each element, 
> > > assuming I have a column including a timestamp.
> > >
> > > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > > this can be implemented and used from yaml prespective.
> > >
> > > Thanks
> > > Yarden


Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Robert Bradshaw via dev
This does appear to be a significant missing feature. I'll try to make
sure something easier gets in by the next release. See also below.

On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
 wrote:
>
> Hi Yarden,
>
> Since it's a bounded source you could try with Sql transformation
> grouping by the timestamp column. Here are some examples of grouping:
>
> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
>
> However, if you want to add a timestamp column in addition to the
> original CSV records then, there are multiple ways to achieve that.
>
> 1) MapToFields:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> [Your timestamp column could be a callable to get the current
> timestamp on each record]
>
> 2) If you need an extra layer of transformation complexity I would
> recommend creating a custom transformation:
>
> # - type: MyCustomTransform
> # name: AddDateTimeColumn
> # config:
> # prefix: 'whatever'
>
> providers:
> - type: 'javaJar'
> config:
> jar: 'gs://path/of/the/java.jar'
> transforms:
> MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Alternatively you can use PyTransform, if you're more comfortable with
that by invoking it via its fully qualified name.

pipeline:
  transforms:
...
- type: MyAssignTimestamps
  config:
  kwarg1: ...
  kwarg2: ...

providers:
  type:python
  config:
packages: ['py_py_package_identifier']
  transforms:
MyAssignTimestamps:
fully_qualified_package.module.AssignTimestampsPTransform



> Best,
> Ferran
>
> El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> escribió:
> >
> > Hi all,
> > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > implement some windowing logic to it.
> > Was wondering what is the right way to add timestamps to each element, 
> > assuming I have a column including a timestamp.
> >
> > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > this can be implemented and used from yaml prespective.
> >
> > Thanks
> > Yarden


Re: (python SDK) "Any" coder bypasses registry coders

2024-01-05 Thread Robert Bradshaw via dev
On Fri, Jan 5, 2024 at 9:42 AM Joey Tran  wrote:
>
>
> I think my original message made it sound like what I thought was confusing 
> was how `Any` works. The scenario that I actually think is confusing is *if a 
> user registers a coder for a data type, this preference will get ignored in 
> non-obvious situations and can (and in my scenario, has) result in 
> non-obvious downstream issues.*


I agree this can be confusing. Essentially, Coders are attached to
PCollections (which are assumed to be of homogeneous type) at compile
time.

>
> On Fri, Jan 5, 2024 at 12:05 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Fri, Jan 5, 2024 at 7:38 AM Joey Tran  wrote:
>>>
>>> I've been working with a few data types that are in practice unpicklable 
>>> and I've run into a couple issues stemming from the `Any` type hint, which 
>>> when used, will result in the PickleCoder getting used even if there's a 
>>> coder in the coder registry that matches the data element.
>>
>>
>> This is likely because we don't know the data type at the time we choose the 
>> coder.
>>
>>>
>>> This was pretty unexpected to me and can result in pretty cryptic 
>>> downstream issues. In the best case, you get an error at pickling time [1], 
>>> and in the worse case, the pickling "succeeds" (since many objects can get 
>>> (de)pickeld without obvious error) but then results in downstream issues 
>>> (e.g. some data doesn't survive depickling).
>>
>>
>> It shouldn't be the case that an object depickles successfully but 
>> incorrectly; sounds like a bug in some custom pickling code.
>
> You don't need custom pickling code for this to happen. For a contrived 
> example, you could imagine some class that caches some state specific to a 
> local system and saves it to a private local variable. If you pickle one of 
> these and then unpickle it on a different system, it would've been unpickled 
> successfully but would be in a bad state.
>
> Rather than mucking around with custom pickling, someone might want to just 
> implement a coder for their special class instead.


It's the same work in both cases, though I can see a coder being
preferable if one does not own the class. (Though copyreg should work
just as well.) In that case, I'd consider explicitly making the class
throw an exception on pickling (though I agree it's hard to see how
one could know to do this by default).

>>>
>>> One example case of the latter is if you flatten a few pcollections 
>>> including a pcollection generated by `beam.Create([])` (the inferred output 
>>> type an empty create becomes Any)
>>
>>
>> Can you add a type hint to the Create?
>
> Yeah this fixes the issue, it's just not obvious (or at least to me) that (1) 
> beam.Create([]) will have an output type of Any (often times the parameter to 
> beam.Create will be some local variable which makes it less obvious) and that


We could update Beam to let the type hint be the empty union, which
would correspond to a coder that can't encode/decode anything, but
when unioned with others (e.g. in a Flatten) does not "taint" the
rest. This doesn't solve unioning two other disjoint types resolving
to the Any coder though.

>
> (2) in this particular case, _how_ downstream pcollections get decoded will 
> be slightly different. In the worse case, the issue won't even result in an 
> error at decoding time (as mentioned before), so then you have to backtrack 
> from some possibly unrelated sounding traceback.
>
>>>
>>> Would it make sense to introduce a new fallback coder that takes precedence 
>>> over the `PickleCoder` that encodes both the data type (by just pickling 
>>> it) and the data encoded using the registry-found coder?
>>
>>
>> This is essentially re-implementing pickle :)
>
> Pickle doesn't use coders from the coder registry which I think is the key 
> distinction here

Pickle is just a dynamic dispatch of type -> encoder.

>>>
>>> This would have some space ramifications for storing the data type for 
>>> every element. Of course this coder would only kick in _if_ the data type 
>>> was found in the registry, otherwise we'd proceed to the picklecoder like 
>>> we do currently
>>
>>
>> I do not think we'd want to introduce this as the default--that'd likely 
>> make common cases much more expensive. IIRC you can manually override the 
>> fallback coder with one of your own choosing. Alternatively, you could look 
>> at using copyreg for your problematic types.
>>
>
> Ah you can indeed override the fallback coder. Okay I'll just d

Re: (python SDK) "Any" coder bypasses registry coders

2024-01-05 Thread Robert Bradshaw via dev
On Fri, Jan 5, 2024 at 7:38 AM Joey Tran  wrote:

> I've been working with a few data types that are in practice
> unpicklable and I've run into a couple issues stemming from the `Any` type
> hint, which when used, will result in the PickleCoder getting used even if
> there's a coder in the coder registry that matches the data element.
>

This is likely because we don't know the data type at the time we choose
the coder.


> This was pretty unexpected to me and can result in pretty cryptic
> downstream issues. In the best case, you get an error at pickling time [1],
> and in the worse case, the pickling "succeeds" (since many objects can get
> (de)pickeld without obvious error) but then results in downstream issues
> (e.g. some data doesn't survive depickling).
>

It shouldn't be the case that an object depickles successfully but
incorrectly; sounds like a bug in some custom pickling code.

One example case of the latter is if you flatten a few pcollections
> including a pcollection generated by `beam.Create([])` (the inferred output
> type an empty create becomes Any)
>

Can you add a type hint to the Create?


> Would it make sense to introduce a new fallback coder that takes
> precedence over the `PickleCoder` that encodes both the data type (by just
> pickling it) and the data encoded using the registry-found coder?
>

This is essentially re-implementing pickle :)


> This would have some space ramifications for storing the data type for
> every element. Of course this coder would only kick in _if_ the data type
> was found in the registry, otherwise we'd proceed to the picklecoder like
> we do currently
>

I do not think we'd want to introduce this as the default--that'd likely
make common cases much more expensive. IIRC you can manually override the
fallback coder with one of your own choosing. Alternatively, you could look
at using copyreg for your problematic types.


> [1] https://github.com/apache/beam/issues/29908 (Issue arises from
> ReshuffleFromKey using `Any` as a pcollection type
>


Re: [VOTE] Release 2.53.0, release candidate #2

2024-01-04 Thread Robert Bradshaw via dev
+1 (binding)

Validated the artifacts and some simple Python pipelines in a fresh
install.

On Wed, Jan 3, 2024 at 5:46 PM Robert Burke  wrote:

> +1 (binding)
>
> Validated the Go SDK against my own pipleines.
>
> Robert Burke
>
> On Wed, Jan 3, 2024, 7:52 AM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (binding)
>>
>> Validated Java/Python x-lang jobs.
>>
>> - Cham
>>
>> On Tue, Jan 2, 2024 at 7:35 AM Jack McCluskey via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Happy New Year, everyone!
>>>
>>> Now that we're through the holidays I just wanted to bump the voting
>>> thread so we can keep the RC moving.
>>>
>>> Thanks,
>>>
>>> Jack McCluskey
>>>
>>> On Fri, Dec 29, 2023 at 11:58 AM Johanna Öjeling via dev <
>>> dev@beam.apache.org> wrote:
>>>
 +1 (non-binding).

 Tested Go SDK with Dataflow on own use cases.

 On Fri, Dec 29, 2023 at 2:57 AM Yi Hu via dev 
 wrote:

> +1 (non-binding)
>
> Tested with Beam GCP IOs benchmarking (
> https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it/google-cloud-platform
> )
>
> On Thu, Dec 28, 2023 at 11:36 AM Svetak Sundhar via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (non binding)
>>
>> Tested with Healthcare notebooks.
>>
>>
>> Svetak Sundhar
>>
>>   Data Engineer
>> s vetaksund...@google.com
>>
>>
>>
>> On Thu, Dec 28, 2023 at 3:52 AM Jan Lukavský  wrote:
>>
>>> +1 (binding)
>>>
>>> Tested Java SDK with Flink Runner.
>>>
>>>  Jan
>>> On 12/27/23 14:13, Danny McCormick via dev wrote:
>>>
>>> +1 (non-binding)
>>>
>>> Tested with some example ML notebooks.
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Tue, Dec 26, 2023 at 6:41 PM XQ Hu via dev 
>>> wrote:
>>>
 +1 (non-binding)

 Tested with the simple RunInference pipeline:
 https://github.com/google/dataflow-ml-starter/actions/runs/7332832875/job/19967521369

 On Tue, Dec 26, 2023 at 3:29 PM Jack McCluskey via dev <
 dev@beam.apache.org> wrote:

> Happy holidays everyone,
>
> Please review and vote on the release candidate #2 for the version
> 2.53.0, as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific
> comments)
>
> Reviewers are encouraged to test their own use cases with the
> release candidate, and vote +1 if no issues are found. Only PMC member
> votes will count towards the final vote, but votes from all community
> members are encouraged and helpful for finding regressions; you can 
> either
> test your own use cases [13] or use cases from the validation sheet 
> [10].
>
> The complete staging area is available for your review, which
> includes:
> * GitHub Release notes [1],
> * the official Apache source release to be deployed to
> dist.apache.org [2], which is signed with the key with
> fingerprint DF3CBA4F3F4199F4 (D20316F712213422 if automated) [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v1.2.3-RC3" [5],
> * website pull request listing the release [6], the blog post [6],
> and publishing the API reference manual [7].
> * Python artifacts are deployed along with the source release to
> the dist.apache.org [2] and PyPI[8].
> * Go artifacts and documentation are available at pkg.go.dev [9]
> * Validation sheet with a tab for 2.53.0 release to help with
> validation [10].
> * Docker images published to Docker Hub [11].
> * PR to run tests against release branch [12].
>
> The vote will be open for at least 72 hours. It is adopted by
> majority approval, with at least 3 PMC affirmative votes.
>
> For guidelines on how to try the release in your projects, check
> out our RC testing guide [13].
>
> Thanks,
>
> Jack McCluskey
>
> [1] https://github.com/apache/beam/milestone/17
> [2] https://dist.apache.org/repos/dist/dev/beam/2.53.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1365/
> [5] https://github.com/apache/beam/tree/v2.53.0-RC2
> [6] https://github.com/apache/beam/pull/29856
> [7] https://github.com/apache/beam-site/pull/657
> [8] https://pypi.org/project/apache-beam/2.53.0rc2/
> [9]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.53.0-RC2/go/pkg/beam
> [10]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1290249774
> 

Re: Constant for beam:runner:executable_stage:v1 ?

2023-12-20 Thread Robert Bradshaw via dev
Not at all. (We may want to lift it to a proto definition that can be
shared for all languages, and similarly for "beam:runner:source:v1" and
"beam:runner:sink:v1" but moving it to a common spot in Python at least is
an improvement.

On Wed, Dec 20, 2023 at 1:19 PM Joey Tran  wrote:

> Would it feel too wrong to put it in commo_urns? [1]
>
> [1]
> https://github.com/apache/beam/blob/8de029a412ab3e87ec92caf29818b51dab4ab02d/sdks/python/apache_beam/portability/common_urns.py
>
> On Wed, Dec 20, 2023 at 4:06 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Technically it's not really part of the model so much as an
>> implementation detail, but it likely does make sense to put somewhere
>> common.
>>
>> On Wed, Dec 20, 2023 at 12:55 PM Joey Tran 
>> wrote:
>>
>>> Hey all,
>>>
>>> Is there a particular reason we hard code
>>> "beam:runner:executable_stage:v1" everywhere in the python SDK instead of
>>> putting it in common_urns?
>>>
>>>
>>>


Re: Constant for beam:runner:executable_stage:v1 ?

2023-12-20 Thread Robert Bradshaw via dev
Technically it's not really part of the model so much as an implementation
detail, but it likely does make sense to put somewhere common.

On Wed, Dec 20, 2023 at 12:55 PM Joey Tran 
wrote:

> Hey all,
>
> Is there a particular reason we hard code
> "beam:runner:executable_stage:v1" everywhere in the python SDK instead of
> putting it in common_urns?
>
>
>


Re: Python SDK logical types with argument

2023-12-20 Thread Robert Bradshaw via dev
On Wed, Dec 20, 2023 at 8:41 AM Ben San Nicolas via dev 
wrote:

> Hi,
>
> I'm looking to make use of https://github.com/apache/beam/issues/23373 so
> I can use a java avro schema with enums xlang from python.
>
> Are there existing ideas on how to implement this?
>
> I tried taking a look and the Python SDK has a very simple map from
> concrete python type to logical type, which doesn't seem sufficient to
> encode additional type info from an argument, since a map from class to
> logical type and the associated _from_typing(class) conversion doesn't
> maintain any arguments.
>
> I tried comparing this with the Java SDK, but it looks like it has a full
> abstraction around a schema model, which directly encodes the logical arg
> type/arg from the protobuf and then handles the conversions implicitly
> somehow/elsewhere. As far as I could tell, this abstraction is missing from
> python, in the sense that schemas_test.py roundtrips proto classes through
> concrete python types like typing.Mapping[typing.Optional[numpy.int64],
> bytes] rather than an SDK-defined schema class.
>
> The simplest solution might be to just update language_type() to return a
> tuple of class, arg, and register that in the logical type mapping. Does
> this make sense?
>

Yes, this'd be a great step forward. Not being able to specify the enum
logical type (that seems widely used in Java) is a major pain point for
Python.


Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Robert Bradshaw via dev
There is definitely a body of future work in intelligently merging
compatible-but-not-equal environments. (Dataflow does this for example.)
Defining/detecting compatibility is not always easy, but sometimes is, and
we should at least cover those cases and grow them over time.

On Fri, Dec 15, 2023 at 5:57 AM Joey Tran  wrote:

> Yeah I can confirm for the python runners (based on my reading of the
> translations.py [1]) that only identical environments are merged together.
>
> The funny thing is that we _originally_ implemented this hint as an
> annotation but then changed it to hint because it semantically felt more
> correct. I think we might go back to that since the environment merging
> logic isn't too flexible / easy to customize. Our type of hint is a bit
> unlike other hints anyways. Unlike resources like MinRam, these resources
> are additive (e.g. you can merge an environment that requires license A and
> an environment that requires license B into an environment that requires
> both A and B)
>
> [1]
> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>
> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke  wrote:
>
>> That would do it. We got so tunnel visioned on side inputs we missed that!
>>
>> IIRC the python local runner and Prism both only fuse transforms in
>> identical environments together. So any environmental diffs will prevent
>> fusion.
>>
>> Runners as a rule are usually free to ignore/manage hints as they like.
>> Transform annotations might be an alternative, but how those are managed
>> would be more SDK specific.
>>
>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran 
>> wrote:
>>
>>> I figured out my issue. I thought side inputs were breaking up my
>>> pipeline but after experimenting with my transforms I now realize what was
>>> actually breaking it up was different transform environments that weren't
>>> considered compatible.
>>>
>>> We have a custom resource hint (for specifying whether a transform needs
>>> access to some software license) that we use with our transforms and that's
>>> what was preventing the fusion I was expecting. I'm I'm looking into how to
>>> make these hints mergeable now.
>>>
>>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke  wrote:
>>>
>>>> Building on what Robert Bradshaw has said, basically, if these fusion
>>>> breaks don't exist, the pipeline can live lock, because the side input is
>>>> unable to finish computing for a given input element's window.
>>>>
>>>> I have recently added fusion to the Go Prism runner based on the python
>>>> side input semantics, and i was surprised that there are basically two
>>>> rules for fusion. The side input one, and for handling Stateful processing.
>>>>
>>>>
>>>> This code here is the greedy fusion algorithm that Python uses, but a
>>>> less set based, so it might be easier to follow:
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>>>
>>>> From the linked code comment:
>>>>
>>>> Side Inputs: A transform S consuming a PCollection as a side input can't
>>>>  be fused with the transform P that produces that PCollection. Further,
>>>> no transform S+ descended from S, can be fused with transform P.
>>>>
>>>> Ideally I'll add visual representations of the graphs in the test suite
>>>> here, that validates the side input dependency logic:
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>>>
>>>> (Note, that test doesn't validate expected fusion results, Prism is a
>>>> work in progress).
>>>>
>>>>
>>>> As for the Stateful rule, this is largely an implementation convenience
>>>> for runners to ensure correct execution.
>>>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>>>> those are usually relegated to the root of a fused stage, and avoids
>>>> fusions with each other. That can also cause additional stages.
>>>>
>>>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>>>> multiple stateful transforms could be fused in the same stage. But that's a
>>>> very different discussion.
>>>>
>>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran 
>>>> wrote:

Re: How do side inputs relate to stage fusion?

2023-12-14 Thread Robert Bradshaw via dev
That is correct. Side inputs give a view of the "whole" PCollection and
hence introduce a fusion-producing barrier. For example, suppose one has a
DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
(as the main and side input respectively) of DoFnB.

   mainPColl - DoFnB
/^
inPColl -- DoFnA |
\|
   sidePColl --- /


Now DoFnB may iterate over the entity of sidePColl for every element of
mainPColl. This means that DoFnA and DoFnB cannot be fused, which
would require DoFnB to consume the elements as they are produced from
DoFnA, but we need DoFnA to run to completion before we know the contents
of sidePColl.

Similar constraints apply in larger graphs (e.g. there may be many
intermediate DoFns and PCollections), but they principally boil down to
shapes that look like this.

Though this does not introduce a global barrier in streaming, there is
still the analogous per window/watermark barrier that prevents fusion for
the same reasons.




On Thu, Dec 14, 2023 at 3:02 PM Joey Tran  wrote:

> Hey all,
>
> We have a pretty big pipeline and while I was inspecting the stages, I
> noticed there is less fusion than I expected. I suspect it has to do with
> the heavy use of side inputs in our workflow. In the python sdk, I see that
> side inputs are considered when determining whether two stages are fusible.
> I have a hard time getting a clear understanding of the logic though. Could
> someone clarify / summarize the rules around this?
>
> Thanks!
> Joey
>


Re: State-Aware Element Batching in Python

2023-12-08 Thread Robert Bradshaw via dev
Thanks for the nice doc. So it looks like this new code is taken
iff max_batch_duration_secs is set?

On Thu, Dec 7, 2023 at 12:28 PM Jack McCluskey via dev 
wrote:

> Hey everyone,
>
> This is long-overdue, but I wanted to share a doc (
> https://docs.google.com/document/d/1Rin_5Vm3qT1Mkb5PcHgTDrjXc3j0Admzi3fEGEHB2-4/edit?usp=sharing)
> that outlines a new implementation of BatchElements in the Python SDK that
> is state-aware, and therefore allows for batching elements across bundles
> with dynamic batch sizing. The transform is actually already in the Beam
> repo, as a prototype PR rapidly turned into the bulk of the work being
> finished. This doc does, at the very least, outline some interesting
> mechanics and nuances involved in putting this together.
>
> The DoFn is largely targeted for use in the RunInference framework;
> however it will be accessible as of 2.53.0 for general use through
> BatchElements as well. The code itself is at
> https://github.com/apache/beam/blob/5becfb8ed430fe9a317cd2ffded576fe2ab8e980/sdks/python/apache_beam/transforms/util.py#L651
>
> Thanks,
>
> Jack McCluskey
>
> --
>
>
> Jack McCluskey
> SWE - DataPLS PLAT/ Dataflow ML
> RDU
> jrmcclus...@google.com
>
>
>


Re: Build python Beam from source

2023-12-05 Thread Robert Bradshaw via dev
To use cross language capabilities from a non-release branch you'll
have to build the cross-language bits yourself as well. This can be
done by

(1) Making sure Java (for java dependencies) is installed.
(2) In the top level of the repository, running .//gradlew
sdks:java:io:expansion-service:shadowJar

For released versions of Beam, it will automatically fetch the
pre-built, released artifacts for you from maven. You can manually
request those of a previous release by passing something like

--beam_services='{"sdks:java:extensions:sql:expansion-service:shadowJar":
"https://repository.apache.org/content/repositories/orgapachebeam-1361/org/apache/beam/beam-sdks-java-extensions-sql-expansion-service/2.52.0/beam-sdks-java-extensions-sql-expansion-service-2.52.0.jar"}'

which basically says "when looking for this target, use that jar"
though as this is using an out-of-date copy of the libraries this may
not always work.


On Tue, Dec 5, 2023 at 6:14 AM Поротиков Станислав Вячеславович via
dev  wrote:
>
> Hello!
> How to properly install/build apache-beam python package from source?
>
> I've tried running:
>
> pip install .
>
> from skds/python directory
>
> It's installed successfully, but when I try to run python beam pipeline, it 
> complains:
> RuntimeError: 
> /lib/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar
>  not found. Please build the server with
>
>  cd /lib; ./gradlew 
> sdks:java:io:expansion-service:shadowJar
>
>
>
> Glad to any help!
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>


Re: Implementing tuple type support in for ClickHouse connector

2023-12-04 Thread Robert Bradshaw via dev
Note that Logical types are not as portable (e.g. for cross-langauge use).

On Mon, Dec 4, 2023 at 9:18 AM Alexey Romanenko
 wrote:
>
> Did you take a look by chance on 
> org.apache.beam.sdk.schemas.Schema.LogicalType? Can it be helpful for your 
> case?
>
> On 4 Dec 2023, at 12:02, Mark Zitnik  wrote:
>
> Yes I know it is done in  org.apache.beam.sdk.io.clickhouse.TableSchema (Did 
> it for several other types), but since Tuple is a nested type that can hold 
> any number of other ClickHouse types I was wondering what is the best type 
> from the Apache Beam side in order to implement it.
>
> Mark
>
> On Mon, Dec 4, 2023 at 12:24 PM Alexey Romanenko  
> wrote:
>>
>> Hi Mark,
>>
>> What do you mean by “support” in this case? To map this ClickHouse data type 
>> to a Beam Schema data type as it’s done in 
>> org.apache.beam.sdk.io.clickhouse.TableSchema for other types or something 
>> else?
>>
>> —
>> Alexey
>>
>> On 3 Dec 2023, at 10:35, Mark Zitnik  wrote:
>>
>> Hi Team,
>>
>> I am one of the committers of the ClickHouse integration team.
>> I need to add support for Tuple in the ClickHouse connector for Apache Beam. 
>> What is the best approval for implementing that? 
>> Tuple(https://clickhouse.com/docs/en/sql-reference/data-types/tuple) in a 
>> nested data 
>> type(https://clickhouse.com/docs/en/sql-reference/data-types#data_types).
>> If you can point me to a reference on other connectors
>>
>> Thanks
>> -MZ
>>
>>
>>
>>
>


Re: Row compatible generated coders for custom classes

2023-12-01 Thread Robert Bradshaw via dev
On Fri, Dec 1, 2023 at 9:13 AM Steven van Rossum via dev
 wrote:
>
> Hi all,
>
> I was benchmarking the fastjson2 serialization library a few weeks back for a 
> Java pipeline I was working on and was asked by a colleague to benchmark 
> binary JSON serialization against Rows for fun. We didn't do any extensive 
> analysis across different shapes and sizes, but the finding on this workload 
> was that serialization to binary JSON (tuple representation)

Meaning BSON I presume? What do you mean by "tuple representation"?
(One downside of JSON is that the field names are redundantly stored
in each record, so even if you save on CPU it may hurt on the network
due to the greater data sizes).

> outperformed the SchemaCoder on throughput by ~11x on serialization and ~5x 
> on deserialization. Additionally, RowCoder outperformed SchemaCoder on 
> throughput by ~1.3x on serialization and ~1.7x on deserialization. Note that 
> all benchmarks measured in the millions of ops/sec for this quick test, so 
> this is already excellent performance obviously.

Sounds like there's a lot of room for improvement! One downside of
Rows is that they can't (IIRC) store (and encode/decode) unboxed
representations of their primitive field types. This alone would be
good to solve, but as mentioned you could probably also skip a Row
intermediate altogether for encoding/decoding.

> I'm sure there's stuff to learn from other serialization libraries, but I'd 
> table that for now. The low hanging fruit improvement would be to skip that 
> intermediate hop to/from Row and instead generate custom SchemaCoders to 
> serialize directly into or deserialize from the Row format.
> I'd be happy to pick this up at some point in the new year, but would just 
> like to get some thoughts from this group.

+1, this'd be a great addition. I think there was some investigation
into using bytebuddy to auto-generate this kind of thing, but I don't
know how extensive it is.


Re: [VOTE] Release 2.52.0, release candidate #5

2023-11-16 Thread Robert Bradshaw via dev
+1 (binding)

The artifacts all look good, as does Python installation into a fresh
environment.


On Thu, Nov 16, 2023 at 2:41 PM Svetak Sundhar via dev 
wrote:

> +1 (non binding)
>
> validated on python use cases.
>
>
> Svetak Sundhar
>
>   Data Engineer
> s vetaksund...@google.com
>
>
>
> On Wed, Nov 15, 2023 at 8:52 AM Jan Lukavský  wrote:
>
>> +1 (binding)
>>
>> Validated Java SDK with Flink runner on own use cases.
>>
>>   Jan
>>
>> On 11/15/23 11:35, Jean-Baptiste Onofré wrote:
>> > +1 (binding)
>> >
>> > Quickly tested Java SDK and checked the legal part (hash, signatures,
>> headers).
>> >
>> > Regards
>> > JB
>> >
>> > On Tue, Nov 14, 2023 at 12:06 AM Danny McCormick via dev
>> >  wrote:
>> >> Hi everyone,
>> >> Please review and vote on the release candidate #5 for the version
>> 2.52.0, as follows:
>> >> [ ] +1, Approve the release
>> >> [ ] -1, Do not approve the release (please provide specific comments)
>> >>
>> >>
>> >> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1 if no issues are found. Only PMC member votes will
>> count towards the final vote, but votes from all community members is
>> encouraged and helpful for finding regressions; you can either test your
>> own use cases or use cases from the validation sheet [10].
>> >>
>> >> The complete staging area is available for your review, which includes:
>> >>
>> >> GitHub Release notes [1]
>> >> the official Apache source release to be deployed to dist.apache.org
>> [2], which is signed with the key with fingerprint D20316F712213422 [3]
>> >> all artifacts to be deployed to the Maven Central Repository [4]
>> >> source code tag "v2.52.0-RC5" [5]
>> >> website pull request listing the release [6], the blog post [6], and
>> publishing the API reference manual [7]
>> >> Python artifacts are deployed along with the source release to the
>> dist.apache.org [2] and PyPI[8].
>> >> Go artifacts and documentation are available at pkg.go.dev [9]
>> >> Validation sheet with a tab for 2.52.0 release to help with validation
>> [10]
>> >> Docker images published to Docker Hub [11]
>> >> PR to run tests against release branch [12]
>> >>
>> >>
>> >> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>> >>
>> >> For guidelines on how to try the release in your projects, check out
>> our blog post at https://beam.apache.org/blog/validate-beam-release/.
>> >>
>> >> Thanks,
>> >> Danny
>> >>
>> >> [1] https://github.com/apache/beam/milestone/16
>> >> [2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
>> >> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> >> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1363/
>> >> [5] https://github.com/apache/beam/tree/v2.52.0-RC5
>> >> [6] https://github.com/apache/beam/pull/29331
>> >> [7] https://github.com/apache/beam-site/pull/655
>> >> [8] https://pypi.org/project/apache-beam/2.52.0rc5/
>> >> [9]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC5/go/pkg/beam
>> >> [10]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
>> >> [11] https://hub.docker.com/search?q=apache%2Fbeam=image
>> >> [12] https://github.com/apache/beam/pull/29418
>>
>


Re: Hiding logging for beam playground examples

2023-11-14 Thread Robert Bradshaw via dev
+1 to at least setting the log level to higher than info. Some runner
logging (e.g. job started/done) may be useful.

On Tue, Nov 14, 2023 at 9:37 AM Joey Tran  wrote:
>
> Hi all,
>
> I just had a workshop to demo beam for people at my company and there was a 
> bit of confusion about whether the beam python playground examples were even 
> working and it turned out they just got confused by all the runner logging 
> that is output.
>
> Is this worth keeping? It seems like it'd be a common source of confusion for 
> new users
>
> Cheers,
> Joey


Re: The Current State of Beam Python Type Hinting

2023-11-14 Thread Robert Bradshaw via dev
Thanks for writing this up! Added some comments to the doc itself.

On Mon, Nov 13, 2023 at 11:01 PM Johanna Öjeling via dev <
dev@beam.apache.org> wrote:

> Thanks - well written! Interesting with the Any type, I learned something
> new. Added a comment.
>
> Johanna
>
> On Mon, Nov 13, 2023 at 6:02 PM Jack McCluskey via dev <
> dev@beam.apache.org> wrote:
>
>> Hey everyone,
>>
>> I put together a small doc explaining how Beam Python type hinting
>> works + where the module needs to go in the future with changes to Python
>> itself. This is over at
>> https://s.apache.org/beam-python-type-hinting-overview and I'll be
>> putting it into a few places for discoverability as well.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> --
>>
>>
>> Jack McCluskey
>> SWE - DataPLS PLAT/ Dataflow ML
>> RDU
>> jrmcclus...@google.com
>>
>>
>>


Re: Adding Dead Letter Queues to Beam IOs

2023-11-10 Thread Robert Bradshaw via dev
Thanks. I added some comments to the doc and open PR.

On Wed, Nov 8, 2023 at 12:44 PM John Casey via dev  wrote:
>
> Hi All,
>
> I've written up a design for adding DLQs to existing Beam IOs. It's been 
> through a round of reviews with some Dataflow folks at Google, but I'd 
> appreciate any comments the rest of Beam have around how to refine the design.
>
> TL;DR: Make it easy for a user to configure IOs to route bad data to an 
> alternate sink instead of crashing the pipeline or having the record be 
> retried indefinitely.
>
> https://docs.google.com/document/d/1NGeCk6tOqF-TiGEAV7ixd_vhIiWz9sHPlCa1P_77Ajs/edit?usp=sharing
>
> Thanks!
>
> John


Re: [VOTE] Release 2.52.0, release candidate #3

2023-11-10 Thread Robert Bradshaw via dev
+1 (binding)

Artifacts and signatures look good, validated one of the Python wheels
in a fresh install.

On Fri, Nov 10, 2023 at 7:23 AM Alexey Romanenko
 wrote:
>
> +1 (binding)
>
> Java SDK with Spark runner
>
> —
> Alexey
>
> On 9 Nov 2023, at 16:44, Ritesh Ghorse via dev  wrote:
>
> +1 (non-binding)
>
> Validated Python SDK quickstart batch and streaming.
>
> Thanks!
>
> On Thu, Nov 9, 2023 at 9:25 AM Jan Lukavský  wrote:
>>
>> +1 (binding)
>>
>> Validated Java SDK with Flink runner on own use cases.
>>
>>  Jan
>>
>> On 11/9/23 03:31, Danny McCormick via dev wrote:
>>
>> Hi everyone,
>> Please review and vote on the release candidate #3 for the version 2.52.0, 
>> as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> Reviewers are encouraged to test their own use cases with the release 
>> candidate, and vote +1 if no issues are found. Only PMC member votes will 
>> count towards the final vote, but votes from all community members is 
>> encouraged and helpful for finding regressions; you can either test your own 
>> use cases or use cases from the validation sheet [10].
>>
>> The complete staging area is available for your review, which includes:
>>
>> GitHub Release notes [1]
>> the official Apache source release to be deployed to dist.apache.org [2], 
>> which is signed with the key with fingerprint D20316F712213422 [3]
>> all artifacts to be deployed to the Maven Central Repository [4]
>> source code tag "v2.52.0-RC3" [5]
>> website pull request listing the release [6], the blog post [6], and 
>> publishing the API reference manual [7]
>> Python artifacts are deployed along with the source release to the 
>> dist.apache.org [2] and PyPI[8].
>> Go artifacts and documentation are available at pkg.go.dev [9]
>> Validation sheet with a tab for 2.52.0 release to help with validation [10]
>> Docker images published to Docker Hub [11]
>> PR to run tests against release branch [12]
>>
>>
>> The vote will be open for at least 72 hours. It is adopted by majority 
>> approval, with at least 3 PMC affirmative votes.
>>
>> For guidelines on how to try the release in your projects, check out our 
>> blog post at https://beam.apache.org/blog/validate-beam-release/.
>>
>> Thanks,
>> Danny
>>
>> [1] https://github.com/apache/beam/milestone/16
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4] https://repository.apache.org/content/repositories/orgapachebeam-1361/
>> [5] https://github.com/apache/beam/tree/v2.52.0-RC3
>> [6] https://github.com/apache/beam/pull/29331
>> [7] https://github.com/apache/beam-site/pull/653
>> [8] https://pypi.org/project/apache-beam/2.52.0rc2/
>> [9] https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC3/go/pkg/beam
>> [10] 
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
>> [11] https://hub.docker.com/search?q=apache%2Fbeam=image
>> [12] https://github.com/apache/beam/pull/29319
>
>


Re: [External Sender] Re: [Question] Error handling for IO Write Functions

2023-11-09 Thread Robert Bradshaw via dev
+1

Specifically, p.run().waitUntilFinish() would throw an exception if there
were errors during pipeline execution.

On Wed, Nov 8, 2023 at 8:05 AM John Casey via dev 
wrote:

> Yep, thats a common misunderstanding with beam.
>
> The code that is actually executed in the try block is just for pipeline
> construction, and no data is processed at this point in time.
>
> Once the pipeline is constructed, the various pardos are serialized, and
> sent to the runners, where they are actually executed.
>
> In this case, if there was an exception in the pardo that converts rows to
> avro, you would see the "Exception when converting Beam Row to Avro Record"
> log in whatever logs your runner provides you, and the exception would
> propagate up to your runner.
>
> In this case, your log log.info("Finished writing Parquet file to path
> {}", writePath); is inaccurate, it will log when the pipeline is
> constructed, not when the parquet write completes
>
> On Wed, Nov 8, 2023 at 10:51 AM Ramya Prasad via dev 
> wrote:
>
>> Hey John,
>>
>> Yes that's how my code is set up, I have the FileIO.write() in its own
>> try-catch block. I took a second look at where exactly the code is failing,
>> and it's actually in a ParDo function which is happening before I call
>> FileIO.write(). But even within that, I've tried adding a try-catch but the
>> error isn't stopping the actual application run in a Spark cluster. In the
>> cluster, I see that the exception is being thrown from my ParDo, but then
>> immediately after that, I see the line* INFO ApplicationMaster: Final
>> app status: SUCCEEDED, exitCode: 0. *This is roughly what my code setup
>> looks like:
>>
>> @Slf4j
>> public class ParquetWriteActionStrategy {
>>
>> public void executeWriteAction(Pipeline p) throws Exception {
>>
>> try {
>>
>> // transform PCollection from type Row to GenericRecords
>> PCollection records = p.apply("transform 
>> PCollection from type Row to GenericRecords",
>> ParDo.of(new DoFn() {
>> @ProcessElement
>> public void processElement(@Element Row row, 
>> OutputReceiver out) {
>> try {
>> 
>> } catch (Exception e) {
>> log.error("Exception when converting Beam 
>> Row to Avro Record: {}", e.getMessage());
>> throw e;
>> }
>>
>> }
>> })).setCoder(AvroCoder.of(avroSchema));
>> records.apply("Writing Parquet Output File", 
>> FileIO.
>> write()
>> .via()
>> .to(writePath)
>> .withSuffix(".parquet"));
>>
>> log.info("Finished writing Parquet file to path {}", writePath);
>> } catch (Exception e) {
>> log.error("Error in Parquet Write Action. {}", e.getMessage());
>> throw e;
>> }
>>
>> }
>>
>>
>> On Wed, Nov 8, 2023 at 9:16 AM John Casey via dev 
>> wrote:
>>
>>> There are 2 execution times when using Beam. The first execution is
>>> local, when a pipeline is constructed, and the second is remote on the
>>> runner, processing data.
>>>
>>> Based on what you said, it sounds like you are wrapping pipeline
>>> construction in a try-catch, and constructing FileIO isn't failing.
>>>
>>> e.g.
>>>
>>> try {
>>>
>>> FileIO.write().someOtherconfigs()
>>>
>>> } catch ...
>>>
>>> this will catch any exceptions in constructing fileio, but the running
>>> pipeline won't propagate exceptions through this exception block.
>>>
>>> On Tue, Nov 7, 2023 at 5:21 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> File write failures should be throwing exceptions that will
>>>> terminate the pipeline on failure. (Generally a distributed runner will
>>>> make multiple attempts before abandoning the entire pipeline of course.)
>>>>
>>>> Are you seeing files failing to be written but no exceptions being
>>>> thrown? If so, this is definitely a bug that we want to resolve.
>>>>
>>>>
>>>> On Tue, Nov 7, 2023 at 11:17

Re: [Question] Error handling for IO Write Functions

2023-11-07 Thread Robert Bradshaw via dev
File write failures should be throwing exceptions that will terminate the
pipeline on failure. (Generally a distributed runner will make multiple
attempts before abandoning the entire pipeline of course.)

Are you seeing files failing to be written but no exceptions being thrown?
If so, this is definitely a bug that we want to resolve.


On Tue, Nov 7, 2023 at 11:17 AM Ramya Prasad via dev 
wrote:

> Hello,
>
> I am a developer using Apache Beam in my Java application, and I need some
> help on how to handle exceptions when writing a file to S3. I have tried
> wrapping my code within a try-catch block, but no exception is being thrown
> within the try block. I'm assuming that FileIO doesn't throw any exceptions
> upon failure. Is there a way in which I can either terminate the program on
> failure or at least be made aware of if any of my write operations fail?
>
> Thanks and sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: Processing time watermarks in KinesisIO

2023-10-31 Thread Robert Bradshaw via dev
On Tue, Oct 31, 2023 at 10:28 AM Jan Lukavský  wrote:
>
> On 10/31/23 17:44, Robert Bradshaw via dev wrote:
> > There are really two cases that make sense:
> >
> > (1) We read the event timestamps from the kafka records themselves and
> > have some external knowledge that guarantees (or at least provides a
> > very good heuristic) about what the timestamps of unread messages
> > could be in the future to set the watermark. This could possibly
> > involve knowing that the timestamps in a partition are monotonically
> > increasing, or somehow have bounded skew.
> +1
> >
> > (2) We use processing time as both the watermark and for setting the
> > event timestamp on produced messages. From this point on we can safely
> > reason about the event time.
> This is where I have some doubts. We can reason about event time, but is
> is not stable upon Pipeline restarts (if there is any downstream
> processing that depends on event time and is not shielded by
> @RequiresStableInput, it might give different results on restarts).

That is a fair point, but I don't think we can guarantee that we have
a timestamp embedded in the record. (Or is there some stable kafka
metadata we could use here, I'm not that familiar with what kafka
guarantees). We could require it to be opt-in given the caveats.

> Is
> there any specific case why not use option 1)? Do we have to provide the
> alternative 2), provided users can implement it themselves (we would
> need to allow users to specify custom timestamp function, but that
> should be done in all cases)?

The tricky bit is how the user specifies the watermark, unless they
can guarantee the custom timestamps are monotonically ordered (at
least within a partition).

> > The current state seems a bit broken if I understand correctly.
> +1
> >
> > On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský  wrote:
> >> I think that instead of deprecating and creating new version, we could 
> >> leverage the proposed update compatibility flag for this [1]. I still have 
> >> some doubts if the processing-time watermarking (and event-time 
> >> assignment) makes sense. Do we have a valid use-case for that? This is 
> >> actually the removed SYNCHRONIZED_PROCESSING_TIME time domain, which is 
> >> problematic - restarts of Pipelines causes timestamps to change and hence 
> >> makes *every* DoFn potentially non-deterministic, which would be 
> >> unexpected side-effect. This makes me wonder if we should remove this 
> >> policy altogether (deprecate or use the update compatibility flag, so that 
> >> the policy throws exception in new version).
> >>
> >> The crucial point would be to find a use-case where it is actually helpful 
> >> to use such policy.
> >> Any ideas?
> >>
> >>   Jan
> >>
> >> [1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2
> >>
> >> On 10/27/23 18:33, Alexey Romanenko wrote:
> >>
> >> Ahh, ok, I see.
> >>
> >> Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
> >> time” watermark policy, which we can remove later, and create a new fixed 
> >> one.
> >>
> >> PS: It’s recommended to use 
> >> "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” instead of deprecated 
> >> “org.apache.beam.sdk.io.kinesis.KinesisIO” one.
> >>
> >> —
> >> Alexey
> >>
> >> On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:
> >>
> >> No, I'm referring to this [1] policy which has unexpected (and hardly 
> >> avoidable on the user-code side) data loss issues. The problem is that 
> >> assigning timestamps to elements and watermarks is completely decoupled 
> >> and unrelated, which I'd say is a bug.
> >>
> >>   Jan
> >>
> >> [1] 
> >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--
> >>
> >> On 10/27/23 16:51, Alexey Romanenko wrote:
> >>
> >> Why not just to create a custom watermark policy for that? Or you mean to 
> >> make it as a default policy?
> >>
> >> —
> >> Alexey
> >>
> >> On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:
> >>
> >>
> >> Hi,
> >>
> >> when discussing about [1] we found out, that the issue is actually caused 
> >> by processing time watermarks in KinesisIO. Enabling this watermark 
> >> outputs watermarks based on current processing time, _but event timestamps 
> >&

Re: Processing time watermarks in KinesisIO

2023-10-31 Thread Robert Bradshaw via dev
There are really two cases that make sense:

(1) We read the event timestamps from the kafka records themselves and
have some external knowledge that guarantees (or at least provides a
very good heuristic) about what the timestamps of unread messages
could be in the future to set the watermark. This could possibly
involve knowing that the timestamps in a partition are monotonically
increasing, or somehow have bounded skew.

(2) We use processing time as both the watermark and for setting the
event timestamp on produced messages. From this point on we can safely
reason about the event time.

The current state seems a bit broken if I understand correctly.

On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský  wrote:
>
> I think that instead of deprecating and creating new version, we could 
> leverage the proposed update compatibility flag for this [1]. I still have 
> some doubts if the processing-time watermarking (and event-time assignment) 
> makes sense. Do we have a valid use-case for that? This is actually the 
> removed SYNCHRONIZED_PROCESSING_TIME time domain, which is problematic - 
> restarts of Pipelines causes timestamps to change and hence makes *every* 
> DoFn potentially non-deterministic, which would be unexpected side-effect. 
> This makes me wonder if we should remove this policy altogether (deprecate or 
> use the update compatibility flag, so that the policy throws exception in new 
> version).
>
> The crucial point would be to find a use-case where it is actually helpful to 
> use such policy.
> Any ideas?
>
>  Jan
>
> [1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2
>
> On 10/27/23 18:33, Alexey Romanenko wrote:
>
> Ahh, ok, I see.
>
> Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
> time” watermark policy, which we can remove later, and create a new fixed one.
>
> PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
> instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.
>
> —
> Alexey
>
> On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:
>
> No, I'm referring to this [1] policy which has unexpected (and hardly 
> avoidable on the user-code side) data loss issues. The problem is that 
> assigning timestamps to elements and watermarks is completely decoupled and 
> unrelated, which I'd say is a bug.
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--
>
> On 10/27/23 16:51, Alexey Romanenko wrote:
>
> Why not just to create a custom watermark policy for that? Or you mean to 
> make it as a default policy?
>
> —
> Alexey
>
> On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:
>
>
> Hi,
>
> when discussing about [1] we found out, that the issue is actually caused by 
> processing time watermarks in KinesisIO. Enabling this watermark outputs 
> watermarks based on current processing time, _but event timestamps are 
> derived from ingestion timestamp_. This can cause unbounded lateness when 
> processing backlog. I think this setup is error-prone and will likely cause 
> data loss due to dropped elements. This can be solved in two ways:
>
>  a) deprecate processing time watermarks, or
>
>  b) modify KinesisIO's watermark policy so that is assigns event timestamps 
> as well (the processing-time watermark policy would have to derive event 
> timestamps from processing-time).
>
> I'd prefer option b) , but it might be a breaking change, moreover I'm not 
> sure if I understand the purpose of processing-time watermark policy, it 
> might be essentially ill defined from the beginning, thus it might really be 
> better to remove it completely. There is also a related issue [2].
>
> Any thoughts on this?
>
>  Jan
>
> [1] https://github.com/apache/beam/issues/25975
>
> [2] https://github.com/apache/beam/issues/28760
>
>
>


Re: Streaming update compatibility

2023-10-27 Thread Robert Bradshaw via dev
On Fri, Oct 27, 2023 at 7:50 AM Kellen Dye via dev  wrote:
>
> > Auto is hard, because it would involve
> > querying the runner before pipeline construction, and we may not even
> > know what the runner is at this point
>
> At the point where pipeline construction will start, you should have access 
> to the pipeline arguments and be able to determine the runner. What seems to 
> be missing is a place to query the runner pre-construction. If that query 
> could return metadata about the currently running version of the job, then 
> that could be incorporated into graph construction as necessary.

While this is the common case, it is not true in general. For example
it's possible to cache the pipeline proto and submit it to a separate
choice of runner later. We have Jobs API implementations that
forward/proxy the job to other runners, and the Python interactive
runner is another example where the runner is late-binding (e.g. one
tries a sample locally, and if all looks good can execute remotely,
and also in this case the graph that's submitted is often mutated
before running).

Also, in the spirit of the portability story, the pipeline definition
itself should be runner-independent.

> That same hook could be a place to for example return the currently-running 
> job graph for pre-submission compatibility checks.

I suppose we could add something to the Jobs API to make "looking up a
previous version of this pipeline" runner-agnostic, though that
assumes it's available at construction time. And +1 as Kellen says we
should define (and be able to check) what pipeline compatibility means
in a via graph-to-graph comparison at the Beam level. I'll defer both
of these as future work as part of the "make update a portable Beam
concept" project.


Re: Streaming update compatibility

2023-10-26 Thread Robert Bradshaw via dev
On Thu, Oct 26, 2023 at 3:59 AM Johanna Öjeling  wrote:
>
> Hi,
>
> I like this idea of making it easier to push out improvements, and had a look 
> at the PR.
>
> One question to better understand how it works today:
>
> The upgrades that the runners do, such as those not visible to the user, can 
> they be initiated at any time or do they only happen in relation to that the 
> user updates the running pipeline e.g. with new user code?

Correct. We're talking about user-initiated changes to their pipeline here.

> And, assuming the former, some reflections that came to mind when reviewing 
> the changes:
>
> Will the update_compatibility_version option be effective both when creating 
> and updating a pipeline? It is grouped with the update options in the Python 
> SDK, but users may want to configure the compatibility already when launching 
> the pipeline.

It will be effective for both, though generally there's little
motivation to not always use the "latest" version when creating a new
pipeline.

> Would it be possible to revert setting a fixed prior version, i.e. 
> (re-)enable upgrades?

The contract would be IF you start with version X (which logically
defaults to the current SDK), THEN all updates also setting this to
version X (even on SDKs > X) should work.

> If yes: in practice, would this motivate another option, or passing a value 
> like "auto" or "latest" to update_compatibility_version?

Unset is interpreted as latest. Auto is hard, because it would involve
querying the runner before pipeline construction, and we may not even
know what the runner is at this point. (Eventually we could do things
like embed both alternative into the graph and let the runner choose,
but this is more speculative and may not be as scalable.)

> The option is being introduced to the Java and Python SDKs. Should this also 
> be applicable to the Go SDK?

Yes, allowing setting this value should be done for Go (and
typescript, and future SDKs) too. As Robert Burke mentioned, we need
to respect the value in those SDKs that have expansion service
implementations first.

> On Thu, Oct 26, 2023 at 2:25 AM Robert Bradshaw via dev  
> wrote:
>>
>> Dataflow (among other runners) has the ability to "upgrade" running
>> pipelines with new code (e.g. capturing bug fixes, dependency updates,
>> and limited topology changes). Unfortunately some improvements (e.g.
>> new and improved ways of writing to BigQuery, optimized use of side
>> inputs, a change in algorithm, sometimes completely internally and not
>> visible to the user) are not sufficiently backwards compatible which
>> causes us, with the motivation to not break users, to either not make
>> these changes or guard them as a parallel opt-in mode which is a
>> significant drain on both developer productivity and causes new
>> pipelines to run in obsolete modes by default.
>>
>> I created https://github.com/apache/beam/pull/29140 which adds a new
>> pipeline option, update_compatibility_version, that allows the SDK to
>> move forward while letting users with pipelines launched previously to
>> manually request the "old" way of doing things to preserve update
>> compatibility. (We should still attempt backwards compatibility when
>> it makes sense, and the old way would remain in code until such a time
>> it's actually deprecated and removed, but this means we won't be
>> constrained by it, especially when it comes to default settings.)
>>
>> Any objections or other thoughts on this approach?
>>
>> - Robert
>>
>> P.S. Separately I think it'd be valuable to elevate the vague notion
>> of update compatibility to a first-class Beam concept and put it on
>> firm footing, but that's a larger conversation outside the thread of
>> this smaller (and I think still useful in such a future world) change.


Streaming update compatibility

2023-10-25 Thread Robert Bradshaw via dev
Dataflow (among other runners) has the ability to "upgrade" running
pipelines with new code (e.g. capturing bug fixes, dependency updates,
and limited topology changes). Unfortunately some improvements (e.g.
new and improved ways of writing to BigQuery, optimized use of side
inputs, a change in algorithm, sometimes completely internally and not
visible to the user) are not sufficiently backwards compatible which
causes us, with the motivation to not break users, to either not make
these changes or guard them as a parallel opt-in mode which is a
significant drain on both developer productivity and causes new
pipelines to run in obsolete modes by default.

I created https://github.com/apache/beam/pull/29140 which adds a new
pipeline option, update_compatibility_version, that allows the SDK to
move forward while letting users with pipelines launched previously to
manually request the "old" way of doing things to preserve update
compatibility. (We should still attempt backwards compatibility when
it makes sense, and the old way would remain in code until such a time
it's actually deprecated and removed, but this means we won't be
constrained by it, especially when it comes to default settings.)

Any objections or other thoughts on this approach?

- Robert

P.S. Separately I think it'd be valuable to elevate the vague notion
of update compatibility to a first-class Beam concept and put it on
firm footing, but that's a larger conversation outside the thread of
this smaller (and I think still useful in such a future world) change.


Re: [Discuss] Idea to increase RC voting participation

2023-10-24 Thread Robert Bradshaw via dev
On Tue, Oct 24, 2023 at 10:35 AM Kenneth Knowles  wrote:

> Tangentially related:
>
> Long ago, attaching an issue to a release was a mandatory step as part of
> closing. Now I think it is not. Is it automatically happening? It looks
> like we have 820 with no milestone
> https://github.com/apache/beam/issues?q=is%3Aissue+no%3Amilestone+is%3Aclosed
>

This could (should) be automatically discoverable. A (closed) issues is
associated with commits which are associated with a release.


> On Tue, Oct 24, 2023 at 1:25 PM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> +1 for going by the commits since this is what matters at the end of the
>> day. Also, many issues may not get tagged correctly for a given release due
>> to either the contributor not tagging the issue or due to commits for the
>> issue spanning multiple Beam releases.
>>
>> For example,
>>
>> For all commits in a given release RC:
>>   * If we find a Github issue for the commit: add a notice to the Github
>> issue
>>   * Else: add the notice to a generic issue for the release including
>> tags for the commit ID, PR author, and the committer who merged the PR.
>>
>> Thanks,
>> Cham
>>
>>
>>
>>
>> On Mon, Oct 23, 2023 at 11:49 AM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> I'd probably vote to include both the issue filer and the contributor.
>>> It is pretty equally straightforward - one way to do this would be using
>>> all issues related to that release's milestone and extracting the issue
>>> author and the issue closer.
>>>
>>> This does leave out the (unfortunately sizable) set of contributions
>>> that don't have an associated issue; if we're worried about that, we could
>>> always fall back to anyone with a commit in the last release who doesn't
>>> have an associated issue (aka what I thought we were initially proposing
>>> and what I think Airflow does today).
>>>
>>> I'm pretty much +1 on any sort of automation here, and it certainly can
>>> come in stages :)
>>>
>>> On Mon, Oct 23, 2023 at 1:50 PM Johanna Öjeling via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Yes that's a good point to include also those who created the issue.
>>>>
>>>> On Mon, Oct 23, 2023, 19:18 Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> On Mon, Oct 23, 2023 at 7:26 AM Danny McCormick via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> So to summarize, I think there's broad consensus (or at least lazy
>>>>>> consensus) around the following:
>>>>>>
>>>>>> - (1) Updating our release email/guidelines to be more specific about
>>>>>> what we mean by release validation/how to be helpful during this process.
>>>>>> This includes both encouraging validation within each user's own code 
>>>>>> base
>>>>>> and encouraging people to document/share their process of validation and
>>>>>> link it in the release spreadsheet.
>>>>>> - (2) Doing something like what Airflow does (#29424
>>>>>> <https://github.com/apache/airflow/issues/29424>) and creating an
>>>>>> issue asking people who have contributed to the current release to help
>>>>>> validate their changes.
>>>>>>
>>>>>> I'm also +1 on doing both of these. The first bit (updating our
>>>>>> guidelines) is relatively easy - it should just require updating
>>>>>> https://github.com/apache/beam/blob/master/contributor-docs/release-guide.md#vote-and-validate-the-release-candidate
>>>>>> .
>>>>>>
>>>>>> I took a look at the second piece (copying what Airflow does) to see
>>>>>> if we could just copy their automation, but it looks like it's tied
>>>>>> to airflow breeze
>>>>>> <https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/provider_issue_TEMPLATE.md.jinja2>
>>>>>> (their repo-specific automation tooling), so we'd probably need to build
>>>>>> the automation ourselves. It shouldn't be terrible, basically we'd want a
>>>>>> GitHub Action that compares the current release tag with the last release
>>>>>> tag, grabs all the commits in between, parses them to get the author, and
>>>>>> 

Re: [Discuss] Idea to increase RC voting participation

2023-10-23 Thread Robert Bradshaw via dev
On Mon, Oct 23, 2023 at 7:26 AM Danny McCormick via dev 
wrote:

> So to summarize, I think there's broad consensus (or at least lazy
> consensus) around the following:
>
> - (1) Updating our release email/guidelines to be more specific about what
> we mean by release validation/how to be helpful during this process. This
> includes both encouraging validation within each user's own code base and
> encouraging people to document/share their process of validation and link
> it in the release spreadsheet.
> - (2) Doing something like what Airflow does (#29424
> <https://github.com/apache/airflow/issues/29424>) and creating an issue
> asking people who have contributed to the current release to help validate
> their changes.
>
> I'm also +1 on doing both of these. The first bit (updating our
> guidelines) is relatively easy - it should just require updating
> https://github.com/apache/beam/blob/master/contributor-docs/release-guide.md#vote-and-validate-the-release-candidate
> .
>
> I took a look at the second piece (copying what Airflow does) to see if we
> could just copy their automation, but it looks like it's tied to airflow
> breeze
> <https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/provider_issue_TEMPLATE.md.jinja2>
> (their repo-specific automation tooling), so we'd probably need to build
> the automation ourselves. It shouldn't be terrible, basically we'd want a
> GitHub Action that compares the current release tag with the last release
> tag, grabs all the commits in between, parses them to get the author, and
> creates an issue with that data, but it does represent more effort than
> just updating a markdown file. There might even be an existing Action that
> can help with this, I haven't looked too hard.
>

I was thinking along the lines of a script that would scrape the issues
resolved in a given release and add a comment to them noting that the
change is in release N and encouraging (with clear instructions) how this
can be validated. Creating a "validate this release" issue with all
"contributing" participants could be an interesting way to do this as well.
(I think it'd be valuable to get those who filed the issue, not just those
who fixed it, to validate.)


> As our next release manager, I'm happy to review PRs for either of these
> if anyone wants to volunteer to help out. If not, I'm happy to update the
> guidelines, but I probably won't have time to add the commit inspection
> tooling (I'm planning on throwing any extra time towards continuing to
> automate release candidate creation which is currently a more impactful
> problem IMO). I would very much like it if both of these things happened
> though :)
>
> Thanks,
> Danny
>
> On Mon, Oct 23, 2023 at 10:05 AM XQ Hu  wrote:
>
>> +1. This is a great idea to try. @Danny McCormick
>>  FYI as our next release manager.
>>
>> On Wed, Oct 18, 2023 at 2:30 PM Johanna Öjeling via dev <
>> dev@beam.apache.org> wrote:
>>
>>> When I have contributed to Apache Airflow, they have tagged all
>>> contributors concerned in a GitHub issue when the RC is available and asked
>>> us to validate it. Example: #29424
>>> <https://github.com/apache/airflow/issues/29424>.
>>>
>>> I found that to be an effective way to notify contributors of the RC and
>>> nudge them to help out. In the issue description there is a reference to
>>> the guidelines on how to test the RC and a note that people are encouraged
>>> to vote on the mailing list (which could admittedly be more highlighted
>>> because I did not pay attention to it until now and was unaware that
>>> contributors had a vote).
>>>
>>> It might be an idea to consider something similar here to increase the
>>> participation?
>>>
>>> On Tue, Oct 17, 2023 at 7:01 PM Jack McCluskey via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> I'm +1 on helping explain what we mean by "validate the RC" since we're
>>>> really just asking users to see if their existing use cases work along with
>>>> our typical slate of tests. I don't know if offloading that work to our
>>>> active validators is the right approach though, documentation/screen share
>>>> of their specific workflow is definitely less useful than having a more
>>>> general outline of how to install the RC and things to look out for when
>>>> testing.
>>>>
>>>> On Tue, Oct 17, 2023 at 12:55 PM Austin Bennett 
>>>> wrote:
>>>>
>>>>> Great effort.  I'm also interested in streamlining releases -- so if
>>>

Re: [QUESTION] Why no auto labels?

2023-10-20 Thread Robert Bradshaw via dev
I'll take another look at the PR. My inclination is still to use uuids
to uniquify. I think that's worth the cost to the readability hit (I'm
OK reducing this down to 6-8 hex digits which will still give very low
chances of collisions, though it doesn't solve the first one). If
someone cares about names more than this they can set them manually.

On Fri, Oct 20, 2023 at 9:30 AM Joey Tran  wrote:
>
> Just want to bump this discussion again. I'm introducing Beam to other 
> developers at my Schrodinger now and the first (of hopefully many!) developer 
> has started migrating our internal workflows to Beam. As I suspected though, 
> he's complained about the iteration cycles spent from using the same 
> transform without specifying a label and from using multiple assert_that's 
> without unique labels.
>
> From the java code[1], it looks like the same naming scheme is used (I think, 
> it's been a decade since I've read java so apologies if I'm misreading) as 
> the PR I posted
>
> [1] 
> https://github.com/apache/beam/blob/e7a6405800a83dd16437b8b1b372e020e010a042/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java#L630
>
> On Fri, Oct 13, 2023 at 1:32 PM Joey Tran  wrote:
>>
>>
>>
>> On Fri, Oct 13, 2023 at 1:18 PM Robert Bradshaw  wrote:
>>>
>>> On Fri, Oct 13, 2023 at 10:08 AM Joey Tran  
>>> wrote:
>>>>
>>>> Are there places on the SDK side that expect unique labels? Or in 
>>>> non-updateable runners?
>>>
>>>
>>> That's a good question. The label eventually ends up here: 
>>> https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L142
>>>  which is technically a violation of the spec if these are not unique 
>>> (though I guess Java already does this?). More importantly, though, the id 
>>> of the transform (the proto contains a map of strings -> transforms) must 
>>> be unique.
>>>
>>
>> Doesn't that suggest not raising an exception won't be a sufficient enough 
>> solution? But I suppose if the Java SDK does this already it's fine..?
>>
>>>
>>> Another option is to make the suffix a uuid rather than a single counter. 
>>> (This would still have issues with the first application possibly getting 
>>> mixed up with a "different" first application unless it was always 
>>> appended.)
>>>
>>
>> Is the marginal benefit (only the first application may be confused instead 
>> of possibly other applications) valuable enough to justify the marginal cost 
>> of the readability hit of adding uuids to the labels?
>>
>>
>>
>>
>>>> On Fri, Oct 13, 2023 at 12:52 PM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>>
>>>>> Thanks for the PR.
>>>>>
>>>>> I think we should follow Java and allow non-unique labels, but not 
>>>>> provide automatic uniquification, In particular, the danger of using a 
>>>>> counter is that one can get accidental (and potentially hard to check) 
>>>>> off-by-one collisions. As a concrete example, imagine one partitions a 
>>>>> dataset into two collections, each followed by a similarly-named 
>>>>> transform.
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A
>>>>>  \
>>>>>--> B
>>>>>
>>>>> Uniquification would give something like
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A
>>>>>  \
>>>>>--> B_2
>>>>>
>>>>> Suppose one then realizes there's a third case to handle, giving
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A --> B
>>>>>  \
>>>>>--> B
>>>>>
>>>>> But this would be uniquified to
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A --> B_2
>>>>>  \
>>>>>--> B_3
>>>>>
>>>>> where the old B_2 got renamed to B_3 and a new B_2 got put in its place. 
>>>>> This is bad because an updating runner would then attribute old B_2's 
>>>>> state to the new B_2 (and also possibly mis-direct any inflight 
>>>>> messages). At least with the old, intersecting names we can detect this 
>>>>> problem rather than silently give corrupt data.
>>>>>

Re: [YAML] Aggregations

2023-10-20 Thread Robert Bradshaw via dev
On Fri, Oct 20, 2023 at 11:35 AM Kenneth Knowles  wrote:
>
> A couple other bits on having an expression language:
>
>  - You already have Python lambdas at places, right? so that's quite a lot 
> more complex than SQL project/aggregate expressions
>  - It really does save a lot of pain for users (at the cost of implementation 
> complexity) when you need to "SUM(col1*col2)" where otherwise you have to Map 
> first. This could be viewed as desirable as well, of course.
>
> Anyhow I'm pretty much in agreement with all your reasoning as to why *not* 
> to use SQL-like expressions in strings. But it does seem odd when juxtaposed 
> with Python snippets.

Well, we say "here's a Python expression" when we're using a Python
string. But "SUM(col1*col2)" isn't as transparent. (Agree about the
niceties of being able to provide an expression rather than a column.)

> On Thu, Oct 19, 2023 at 4:00 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Thu, Oct 19, 2023 at 12:53 PM Reuven Lax  wrote:
>> >
>> > Is the schema Group transform (in Java) something along these lines?
>>
>> Yes, for sure it is. It (and Python's and Typescript's equivalent) are
>> linked in the original post. The open question is how to best express
>> this in YAML.
>>
>> > On Wed, Oct 18, 2023 at 1:11 PM Robert Bradshaw via dev 
>> >  wrote:
>> >>
>> >> Beam Yaml has good support for IOs and mappings, but one key missing
>> >> feature for even writing a WordCount is the ability to do Aggregations
>> >> [1]. While the traditional Beam primitive is GroupByKey (and
>> >> CombineValues), we're eschewing KVs in the notion of more schema'd
>> >> data (which has some precedence in our other languages, see the links
>> >> below). The key components the user needs to specify are (1) the key
>> >> fields on which the grouping will take place, (2) the fields
>> >> (expressions?) involved in the aggregation, and (3) what aggregating
>> >> fn to use.
>> >>
>> >> A straw-man example could be something like
>> >>
>> >> type: Aggregating
>> >> config:
>> >>   key: [field1, field2]
>> >>   aggregating:
>> >> total_cost:
>> >>   fn: sum
>> >>   value: cost
>> >> max_cost:
>> >>   fn: max
>> >>   value: cost
>> >>
>> >> This would basically correspond to the SQL expression
>> >>
>> >> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> >> from table GROUP BY field1, field2"
>> >>
>> >> (though I'm not requiring that we use this as an implementation
>> >> strategy). I do not think we need a separate (non aggregating)
>> >> Grouping operation, this can be accomplished by having a concat-style
>> >> combiner.
>> >>
>> >> There are still some open questions here, notably around how to
>> >> specify the aggregation fns themselves. We could of course provide a
>> >> number of built-ins (like SQL does). This gets into the question of
>> >> how and where to document this complete set, but some basics should
>> >> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> >> quantiles); where do we put the parameters? We could go with something
>> >> like
>> >>
>> >> fn:
>> >>   type: ApproximateQuantiles
>> >>   config:
>> >> n: 10
>> >>
>> >> but others are even configured by functions themselves (e.g. LargestN
>> >> that wants a comparator Fn). Maybe we decide not to support these
>> >> (yet?)
>> >>
>> >> One thing I think we should support, however, is referencing custom
>> >> CombineFns. We have some precedent for this with our Fns from
>> >> MapToFields, where we accept things like inline lambdas and external
>> >> references. Again the topic of how to configure them comes up, as
>> >> these custom Fns are more likely to be parameterized than Map Fns
>> >> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> >> MapFns as well). Maybe we allow
>> >>
>> >> language: python. # like MapToFields (and here it'd be harder to mix
>> >> and match per Fn)
>> >> fn:
>> >>   type: ???
>> >>   # should these be nested as config?
>> >>   name: fully.qualiied.name
>> >>   path: /path/to/defining/file
>> >>   args: [...]
>> >>   kwargs: {...}
>> >>
>> >> which would invoke the constructor.
>> >>
>> >> I'm also open to other ways of naming/structuring these essential
>> >> parameters if it makes things more clear.
>> >>
>> >> - Robert
>> >>
>> >>
>> >> Java: 
>> >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> >> Python: 
>> >> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> >> Typescript: 
>> >> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>> >>
>> >> [1] One can of course use SqlTransform for this, but I'm leaning
>> >> towards offering something more native.


Re: [Discuss] Idea to increase RC voting participation

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 12:18 PM Kenneth Knowles  wrote:

> +1 to more helpful guide on "how to usefully participate in RC validation"
> but also big +1 to Robert, Jack, Johanna.
>
> TL;DR the RC validation is an opportunity for downstream testing.
>
> Robert alluded to the origin of the spreadsheet: I created it long ago to
> validate that the human language on our web page actually works. Maybe
> someone should automate that with an LLM now.
>
> Robert also alluded to clean environment: our gradle scripts and GHA
> scripts and CI environment are heavily enough engineered that they don't
> represent what a user will experience. We could potentially use our starter
> repos for an adequate smoke test here.
>
> Those are both ways that *we* can pretend to be users. But actual users
> checking the RC to make sure they'll have a smooth upgrade is by far the
> most impactful validation.
>
> This thread honestly makes me want to delete the spreadsheet but maybe
> come up with a guide for downstream projects to validate against an RC.
> Maybe that's an extreme reaction...
>

I would very much be in favor of that.

On Wed, Oct 18, 2023 at 2:32 PM Robert Bradshaw via dev 
> wrote:
>
>> +1 That's a great idea. They have incentive to make sure the issue was
>> resolved for them, plus we get to ensure there were no other regressions.
>>
>> On Wed, Oct 18, 2023 at 11:30 AM Johanna Öjeling via dev <
>> dev@beam.apache.org> wrote:
>>
>>> When I have contributed to Apache Airflow, they have tagged all
>>> contributors concerned in a GitHub issue when the RC is available and asked
>>> us to validate it. Example: #29424
>>> <https://github.com/apache/airflow/issues/29424>.
>>>
>>> I found that to be an effective way to notify contributors of the RC and
>>> nudge them to help out. In the issue description there is a reference to
>>> the guidelines on how to test the RC and a note that people are encouraged
>>> to vote on the mailing list (which could admittedly be more highlighted
>>> because I did not pay attention to it until now and was unaware that
>>> contributors had a vote).
>>>
>>> It might be an idea to consider something similar here to increase the
>>> participation?
>>>
>>> On Tue, Oct 17, 2023 at 7:01 PM Jack McCluskey via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> I'm +1 on helping explain what we mean by "validate the RC" since we're
>>>> really just asking users to see if their existing use cases work along with
>>>> our typical slate of tests. I don't know if offloading that work to our
>>>> active validators is the right approach though, documentation/screen share
>>>> of their specific workflow is definitely less useful than having a more
>>>> general outline of how to install the RC and things to look out for when
>>>> testing.
>>>>
>>>> On Tue, Oct 17, 2023 at 12:55 PM Austin Bennett 
>>>> wrote:
>>>>
>>>>> Great effort.  I'm also interested in streamlining releases -- so if
>>>>> there are alot of manual tests that could be automated, would be great
>>>>> to discover and then look to address.
>>>>>
>>>>> On Tue, Oct 17, 2023 at 8:47 AM Robert Bradshaw via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> +1
>>>>>>
>>>>>> I would also strongly suggest that people try out the release against
>>>>>> their own codebases. This has the benefit of ensuring the release won't
>>>>>> break your own code when they go out, and stress-tests the new code 
>>>>>> against
>>>>>> real-world pipelines. (Ideally our own tests are all passing, and this
>>>>>> validation is automated as much as possible (though ensuring it matches 
>>>>>> our
>>>>>> documentation and works in a clean environment still has value), but
>>>>>> there's a lot of code and uses out there that we don't have access to
>>>>>> during normal Beam development.)
>>>>>>
>>>>>> On Tue, Oct 17, 2023 at 8:21 AM Svetak Sundhar via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I’ve participated in RC testing for a few releases and have observed
>>>>>>> a bit of a knowledge gap in how releases can be tested.

Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 12:53 PM Reuven Lax  wrote:
>
> Is the schema Group transform (in Java) something along these lines?

Yes, for sure it is. It (and Python's and Typescript's equivalent) are
linked in the original post. The open question is how to best express
this in YAML.

> On Wed, Oct 18, 2023 at 1:11 PM Robert Bradshaw via dev  
> wrote:
>>
>> Beam Yaml has good support for IOs and mappings, but one key missing
>> feature for even writing a WordCount is the ability to do Aggregations
>> [1]. While the traditional Beam primitive is GroupByKey (and
>> CombineValues), we're eschewing KVs in the notion of more schema'd
>> data (which has some precedence in our other languages, see the links
>> below). The key components the user needs to specify are (1) the key
>> fields on which the grouping will take place, (2) the fields
>> (expressions?) involved in the aggregation, and (3) what aggregating
>> fn to use.
>>
>> A straw-man example could be something like
>>
>> type: Aggregating
>> config:
>>   key: [field1, field2]
>>   aggregating:
>> total_cost:
>>   fn: sum
>>   value: cost
>> max_cost:
>>   fn: max
>>   value: cost
>>
>> This would basically correspond to the SQL expression
>>
>> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> from table GROUP BY field1, field2"
>>
>> (though I'm not requiring that we use this as an implementation
>> strategy). I do not think we need a separate (non aggregating)
>> Grouping operation, this can be accomplished by having a concat-style
>> combiner.
>>
>> There are still some open questions here, notably around how to
>> specify the aggregation fns themselves. We could of course provide a
>> number of built-ins (like SQL does). This gets into the question of
>> how and where to document this complete set, but some basics should
>> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> quantiles); where do we put the parameters? We could go with something
>> like
>>
>> fn:
>>   type: ApproximateQuantiles
>>   config:
>> n: 10
>>
>> but others are even configured by functions themselves (e.g. LargestN
>> that wants a comparator Fn). Maybe we decide not to support these
>> (yet?)
>>
>> One thing I think we should support, however, is referencing custom
>> CombineFns. We have some precedent for this with our Fns from
>> MapToFields, where we accept things like inline lambdas and external
>> references. Again the topic of how to configure them comes up, as
>> these custom Fns are more likely to be parameterized than Map Fns
>> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> MapFns as well). Maybe we allow
>>
>> language: python. # like MapToFields (and here it'd be harder to mix
>> and match per Fn)
>> fn:
>>   type: ???
>>   # should these be nested as config?
>>   name: fully.qualiied.name
>>   path: /path/to/defining/file
>>   args: [...]
>>   kwargs: {...}
>>
>> which would invoke the constructor.
>>
>> I'm also open to other ways of naming/structuring these essential
>> parameters if it makes things more clear.
>>
>> - Robert
>>
>>
>> Java: 
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> Python: 
>> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> Typescript: 
>> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>>
>> [1] One can of course use SqlTransform for this, but I'm leaning
>> towards offering something more native.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 11:12 AM Kenneth Knowles  wrote:
>
> Using SQL expressions in strings is maybe OK given we are all
> relational all the time. Either way you have to define what the
> universe of `fn` is. Here's a compact possibility:
>
> type: Combine
> config:
>   group_by: [field1, field2]
>   aggregates:
> max_cost: "MAX(cost)"
> total_cost: "SUM(cost)"
>
> Just a thought to get it closer to SQL concision.

So I'm a bit wary of having to parse these strings (unless a language
parameter is passed in which case we defer to that language's syntax).
It's also messy the other way around, if a tool generates the YAML I'd
rather it not have to generate strings like this (i.e. string literals
should either be identifiers or opaque blobs).

Pandas achieves consciousness by allowing one to just specify, say,
'sum' and implicitly summing over all (numeric) fields, and allowing
more verbose, precise specification as well.

> I also used the word
> "Combine" just to connect it to other Beam writings and whatnot.

+1

> On Thu, Oct 19, 2023 at 1:41 PM Robert Bradshaw via dev
>  wrote:
> >
> > On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:
> > >
> > > On 10/19/23 18:28, Robert Bradshaw via dev wrote:
> > > > On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  
> > > > wrote:
> > > >> Rill is definitely SQL-oriented but I think that's going to be the 
> > > >> most common. Dataframes are explicitly modeled on the relational 
> > > >> approach so that's going to look a lot like SQL,
> > > > I think pretty much any approach that fits here is going to be
> > > > relational, meaning you choose a set of columns to group on, a set of
> > > > columns to aggregate, and how to aggregate. The big open question is
> > > > what syntax to use for the "how."
> > > This might be already answered, if so, pardon my ignorance, but what is
> > > the goal this declarative approach is trying to solve? Is it meant to be
> > > more expressive or equally expressive than SQL? And if more, how much 
> > > more?
> >
> > I'm not sure if you're asking about YAML in general, or the particular
> > case of aggregation, but I can answer both.
> >
> > For the larger Beam YAML project, it's trying to solve the problem
> > that SQL is (and I'll admit this is somewhat subjective here) good at
> > expressing the T part of ETL, but not the other parts. For example,
> > the simple data movent usecase of (say) reading from PubSub and
> > dumping into BigQuery is not well expressed in terms of SQL. SQL is
> > also fairly awkward when it comes to defining UDFs and TDFs and
> > non-linear pipelines (especially those with fanout). There are of
> > course other tools in this space (dbt comes to mind, and there's been
> > some investigation on how to make dbt play well with Beam). The other
> > niche it is trying to solve is that installing and learning a full SDK
> > is heavyweight and overkill for creating pipelines that are simply
> > wiring together pre-defined transforms.
> >
> > As for the more narrow case of aggregations, I think being similarly
> > expressive as SQL is fine, though it'd be good to make custom UADFs
> > more natural. Originally I was thinking that just having SqlTransform
> > might be sufficient, but it feels like a big hammer to reach for every
> > time I just want to sum over one or two columns.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 11:42 AM Jan Lukavský  wrote:
>
> On 10/19/23 19:41, Robert Bradshaw via dev wrote:
> > On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:
> >> On 10/19/23 18:28, Robert Bradshaw via dev wrote:
> >>> On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:
> >>>> Rill is definitely SQL-oriented but I think that's going to be the most 
> >>>> common. Dataframes are explicitly modeled on the relational approach so 
> >>>> that's going to look a lot like SQL,
> >>> I think pretty much any approach that fits here is going to be
> >>> relational, meaning you choose a set of columns to group on, a set of
> >>> columns to aggregate, and how to aggregate. The big open question is
> >>> what syntax to use for the "how."
> >> This might be already answered, if so, pardon my ignorance, but what is
> >> the goal this declarative approach is trying to solve? Is it meant to be
> >> more expressive or equally expressive than SQL? And if more, how much more?
> > I'm not sure if you're asking about YAML in general, or the particular
> > case of aggregation, but I can answer both.
> >
> > For the larger Beam YAML project, it's trying to solve the problem
> > that SQL is (and I'll admit this is somewhat subjective here) good at
> > expressing the T part of ETL, but not the other parts. For example,
> > the simple data movent usecase of (say) reading from PubSub and
> > dumping into BigQuery is not well expressed in terms of SQL. SQL is
> > also fairly awkward when it comes to defining UDFs and TDFs and
> > non-linear pipelines (especially those with fanout). There are of
> > course other tools in this space (dbt comes to mind, and there's been
> > some investigation on how to make dbt play well with Beam). The other
> > niche it is trying to solve is that installing and learning a full SDK
> > is heavyweight and overkill for creating pipelines that are simply
> > wiring together pre-defined transforms.
>
> I think FlinkSQL solves the problem of E and L in SQL via CREATE TABLE
> and INSERT statements. I agree with the fanout part, though it could be
> possible to use CREATE (TEMPORARY) TABLE AS SELECT ... could solve that
> as well.

Yeah, Beam uses the CREATE TABLE for referring to external data too.
(I don't remember about INSERT statements). This is where (IMHO of
course) SQL starts to get very messy (and non-standard).

> > As for the more narrow case of aggregations, I think being similarly
> > expressive as SQL is fine, though it'd be good to make custom UADFs
> > more natural. Originally I was thinking that just having SqlTransform
> > might be sufficient, but it feels like a big hammer to reach for every
> > time I just want to sum over one or two columns.
>
> Yes, defining UDFs and UDAFs is painful, that was the motivation of my
> question. It also defines how the syntax for such UDAF would need to
> look like. It would require to break UDAFs down to several primitive
> UDFs and then use a functional style to declare them. Most of the time
> it would be probably sufficient to use simplified CombineFn semantics
> with accumulator being limited to a primitive type (long, double,
> string, maybe array?). I suppose declaring a full-blown stateful DoFn
> (timers, generic state, ...) is out of scope.

Other than the possible totally-commutative-associative V* -> V case,
I'm probably fine with referencing existing CombineFns (including ones
defined in files like we do with mapping fns) rather than providing a
full YAML syntax for defining them. But we do need to be able to
parameterize them.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:
>
> On 10/19/23 18:28, Robert Bradshaw via dev wrote:
> > On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:
> >> Rill is definitely SQL-oriented but I think that's going to be the most 
> >> common. Dataframes are explicitly modeled on the relational approach so 
> >> that's going to look a lot like SQL,
> > I think pretty much any approach that fits here is going to be
> > relational, meaning you choose a set of columns to group on, a set of
> > columns to aggregate, and how to aggregate. The big open question is
> > what syntax to use for the "how."
> This might be already answered, if so, pardon my ignorance, but what is
> the goal this declarative approach is trying to solve? Is it meant to be
> more expressive or equally expressive than SQL? And if more, how much more?

I'm not sure if you're asking about YAML in general, or the particular
case of aggregation, but I can answer both.

For the larger Beam YAML project, it's trying to solve the problem
that SQL is (and I'll admit this is somewhat subjective here) good at
expressing the T part of ETL, but not the other parts. For example,
the simple data movent usecase of (say) reading from PubSub and
dumping into BigQuery is not well expressed in terms of SQL. SQL is
also fairly awkward when it comes to defining UDFs and TDFs and
non-linear pipelines (especially those with fanout). There are of
course other tools in this space (dbt comes to mind, and there's been
some investigation on how to make dbt play well with Beam). The other
niche it is trying to solve is that installing and learning a full SDK
is heavyweight and overkill for creating pipelines that are simply
wiring together pre-defined transforms.

As for the more narrow case of aggregations, I think being similarly
expressive as SQL is fine, though it'd be good to make custom UADFs
more natural. Originally I was thinking that just having SqlTransform
might be sufficient, but it feels like a big hammer to reach for every
time I just want to sum over one or two columns.


Re: [YAML] Aggregations

2023-10-19 Thread Robert Bradshaw via dev
On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:
>
> Rill is definitely SQL-oriented but I think that's going to be the most 
> common. Dataframes are explicitly modeled on the relational approach so 
> that's going to look a lot like SQL,

I think pretty much any approach that fits here is going to be
relational, meaning you choose a set of columns to group on, a set of
columns to aggregate, and how to aggregate. The big open question is
what syntax to use for the "how."

Dataframe aggregation is probably a good example to look at. Here we
have panda and R in particular as concrete instances. It should also
be easy to support different aggregations over different (or the same)
columns. Pandas can take a list of (or mapping to) functions in its
groupby().agg(). R doesn't seem to make this very easy...

> which leaves us with S-style formulas (which I like but are pretty niche)

I'm curious, what are these?

>  and I guess pivot tables coming from the spreadsheet world. Does make me 
> wonder what Rails' ORM looks like these days (I last used v4), it had some 
> aggregation support and was pretty declarative...
>
> On Wed, Oct 18, 2023 at 6:06 PM Robert Bradshaw  wrote:
>>
>> On Wed, Oct 18, 2023 at 5:06 PM Byron Ellis  wrote:
>> >
>> > Is it worth taking a look at similar prior art in the space?
>>
>> +1. Pointers welcome.
>>
>> > The first one that comes to mind is Transform, but with the dbt labs 
>> > acquisition that spec is a lot harder to find. Rill is pretty similar 
>> > though.
>>
>> Rill seems to be very SQL-based.
>>
>> > On Wed, Oct 18, 2023 at 1:12 PM Robert Bradshaw via dev 
>> >  wrote:
>> >>
>> >> Beam Yaml has good support for IOs and mappings, but one key missing
>> >> feature for even writing a WordCount is the ability to do Aggregations
>> >> [1]. While the traditional Beam primitive is GroupByKey (and
>> >> CombineValues), we're eschewing KVs in the notion of more schema'd
>> >> data (which has some precedence in our other languages, see the links
>> >> below). The key components the user needs to specify are (1) the key
>> >> fields on which the grouping will take place, (2) the fields
>> >> (expressions?) involved in the aggregation, and (3) what aggregating
>> >> fn to use.
>> >>
>> >> A straw-man example could be something like
>> >>
>> >> type: Aggregating
>> >> config:
>> >>   key: [field1, field2]
>> >>   aggregating:
>> >> total_cost:
>> >>   fn: sum
>> >>   value: cost
>> >> max_cost:
>> >>   fn: max
>> >>   value: cost
>> >>
>> >> This would basically correspond to the SQL expression
>> >>
>> >> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> >> from table GROUP BY field1, field2"
>> >>
>> >> (though I'm not requiring that we use this as an implementation
>> >> strategy). I do not think we need a separate (non aggregating)
>> >> Grouping operation, this can be accomplished by having a concat-style
>> >> combiner.
>> >>
>> >> There are still some open questions here, notably around how to
>> >> specify the aggregation fns themselves. We could of course provide a
>> >> number of built-ins (like SQL does). This gets into the question of
>> >> how and where to document this complete set, but some basics should
>> >> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> >> quantiles); where do we put the parameters? We could go with something
>> >> like
>> >>
>> >> fn:
>> >>   type: ApproximateQuantiles
>> >>   config:
>> >> n: 10
>> >>
>> >> but others are even configured by functions themselves (e.g. LargestN
>> >> that wants a comparator Fn). Maybe we decide not to support these
>> >> (yet?)
>> >>
>> >> One thing I think we should support, however, is referencing custom
>> >> CombineFns. We have some precedent for this with our Fns from
>> >> MapToFields, where we accept things like inline lambdas and external
>> >> references. Again the topic of how to configure them comes up, as
>> >> these custom Fns are more likely to be parameterized than Map Fns
>> >> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> >> MapFns as well). Maybe we

Re: [YAML] Aggregations

2023-10-18 Thread Robert Bradshaw via dev
On Wed, Oct 18, 2023 at 5:06 PM Byron Ellis  wrote:
>
> Is it worth taking a look at similar prior art in the space?

+1. Pointers welcome.

> The first one that comes to mind is Transform, but with the dbt labs 
> acquisition that spec is a lot harder to find. Rill is pretty similar though.

Rill seems to be very SQL-based.

> On Wed, Oct 18, 2023 at 1:12 PM Robert Bradshaw via dev  
> wrote:
>>
>> Beam Yaml has good support for IOs and mappings, but one key missing
>> feature for even writing a WordCount is the ability to do Aggregations
>> [1]. While the traditional Beam primitive is GroupByKey (and
>> CombineValues), we're eschewing KVs in the notion of more schema'd
>> data (which has some precedence in our other languages, see the links
>> below). The key components the user needs to specify are (1) the key
>> fields on which the grouping will take place, (2) the fields
>> (expressions?) involved in the aggregation, and (3) what aggregating
>> fn to use.
>>
>> A straw-man example could be something like
>>
>> type: Aggregating
>> config:
>>   key: [field1, field2]
>>   aggregating:
>> total_cost:
>>   fn: sum
>>   value: cost
>> max_cost:
>>   fn: max
>>   value: cost
>>
>> This would basically correspond to the SQL expression
>>
>> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> from table GROUP BY field1, field2"
>>
>> (though I'm not requiring that we use this as an implementation
>> strategy). I do not think we need a separate (non aggregating)
>> Grouping operation, this can be accomplished by having a concat-style
>> combiner.
>>
>> There are still some open questions here, notably around how to
>> specify the aggregation fns themselves. We could of course provide a
>> number of built-ins (like SQL does). This gets into the question of
>> how and where to document this complete set, but some basics should
>> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> quantiles); where do we put the parameters? We could go with something
>> like
>>
>> fn:
>>   type: ApproximateQuantiles
>>   config:
>> n: 10
>>
>> but others are even configured by functions themselves (e.g. LargestN
>> that wants a comparator Fn). Maybe we decide not to support these
>> (yet?)
>>
>> One thing I think we should support, however, is referencing custom
>> CombineFns. We have some precedent for this with our Fns from
>> MapToFields, where we accept things like inline lambdas and external
>> references. Again the topic of how to configure them comes up, as
>> these custom Fns are more likely to be parameterized than Map Fns
>> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> MapFns as well). Maybe we allow
>>
>> language: python. # like MapToFields (and here it'd be harder to mix
>> and match per Fn)
>> fn:
>>   type: ???
>>   # should these be nested as config?
>>   name: fully.qualiied.name
>>   path: /path/to/defining/file
>>   args: [...]
>>   kwargs: {...}
>>
>> which would invoke the constructor.
>>
>> I'm also open to other ways of naming/structuring these essential
>> parameters if it makes things more clear.
>>
>> - Robert
>>
>>
>> Java: 
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> Python: 
>> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> Typescript: 
>> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>>
>> [1] One can of course use SqlTransform for this, but I'm leaning
>> towards offering something more native.


[YAML] Aggregations

2023-10-18 Thread Robert Bradshaw via dev
Beam Yaml has good support for IOs and mappings, but one key missing
feature for even writing a WordCount is the ability to do Aggregations
[1]. While the traditional Beam primitive is GroupByKey (and
CombineValues), we're eschewing KVs in the notion of more schema'd
data (which has some precedence in our other languages, see the links
below). The key components the user needs to specify are (1) the key
fields on which the grouping will take place, (2) the fields
(expressions?) involved in the aggregation, and (3) what aggregating
fn to use.

A straw-man example could be something like

type: Aggregating
config:
  key: [field1, field2]
  aggregating:
total_cost:
  fn: sum
  value: cost
max_cost:
  fn: max
  value: cost

This would basically correspond to the SQL expression

"SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
from table GROUP BY field1, field2"

(though I'm not requiring that we use this as an implementation
strategy). I do not think we need a separate (non aggregating)
Grouping operation, this can be accomplished by having a concat-style
combiner.

There are still some open questions here, notably around how to
specify the aggregation fns themselves. We could of course provide a
number of built-ins (like SQL does). This gets into the question of
how and where to document this complete set, but some basics should
take us pretty far. Many aggregators, however, are parameterized (e.g.
quantiles); where do we put the parameters? We could go with something
like

fn:
  type: ApproximateQuantiles
  config:
n: 10

but others are even configured by functions themselves (e.g. LargestN
that wants a comparator Fn). Maybe we decide not to support these
(yet?)

One thing I think we should support, however, is referencing custom
CombineFns. We have some precedent for this with our Fns from
MapToFields, where we accept things like inline lambdas and external
references. Again the topic of how to configure them comes up, as
these custom Fns are more likely to be parameterized than Map Fns
(though, to be clear, perhaps it'd be good to allow parameterizatin of
MapFns as well). Maybe we allow

language: python. # like MapToFields (and here it'd be harder to mix
and match per Fn)
fn:
  type: ???
  # should these be nested as config?
  name: fully.qualiied.name
  path: /path/to/defining/file
  args: [...]
  kwargs: {...}

which would invoke the constructor.

I'm also open to other ways of naming/structuring these essential
parameters if it makes things more clear.

- Robert


Java: 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
Python: 
https://beam.apache.org/documentation/transforms/python/aggregation/groupby
Typescript: 
https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html

[1] One can of course use SqlTransform for this, but I'm leaning
towards offering something more native.


Re: [Discuss] Idea to increase RC voting participation

2023-10-18 Thread Robert Bradshaw via dev
+1 That's a great idea. They have incentive to make sure the issue was
resolved for them, plus we get to ensure there were no other regressions.

On Wed, Oct 18, 2023 at 11:30 AM Johanna Öjeling via dev <
dev@beam.apache.org> wrote:

> When I have contributed to Apache Airflow, they have tagged all
> contributors concerned in a GitHub issue when the RC is available and asked
> us to validate it. Example: #29424
> <https://github.com/apache/airflow/issues/29424>.
>
> I found that to be an effective way to notify contributors of the RC and
> nudge them to help out. In the issue description there is a reference to
> the guidelines on how to test the RC and a note that people are encouraged
> to vote on the mailing list (which could admittedly be more highlighted
> because I did not pay attention to it until now and was unaware that
> contributors had a vote).
>
> It might be an idea to consider something similar here to increase the
> participation?
>
> On Tue, Oct 17, 2023 at 7:01 PM Jack McCluskey via dev <
> dev@beam.apache.org> wrote:
>
>> I'm +1 on helping explain what we mean by "validate the RC" since we're
>> really just asking users to see if their existing use cases work along with
>> our typical slate of tests. I don't know if offloading that work to our
>> active validators is the right approach though, documentation/screen share
>> of their specific workflow is definitely less useful than having a more
>> general outline of how to install the RC and things to look out for when
>> testing.
>>
>> On Tue, Oct 17, 2023 at 12:55 PM Austin Bennett 
>> wrote:
>>
>>> Great effort.  I'm also interested in streamlining releases -- so if
>>> there are alot of manual tests that could be automated, would be great
>>> to discover and then look to address.
>>>
>>> On Tue, Oct 17, 2023 at 8:47 AM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> +1
>>>>
>>>> I would also strongly suggest that people try out the release against
>>>> their own codebases. This has the benefit of ensuring the release won't
>>>> break your own code when they go out, and stress-tests the new code against
>>>> real-world pipelines. (Ideally our own tests are all passing, and this
>>>> validation is automated as much as possible (though ensuring it matches our
>>>> documentation and works in a clean environment still has value), but
>>>> there's a lot of code and uses out there that we don't have access to
>>>> during normal Beam development.)
>>>>
>>>> On Tue, Oct 17, 2023 at 8:21 AM Svetak Sundhar via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I’ve participated in RC testing for a few releases and have observed a
>>>>> bit of a knowledge gap in how releases can be tested. Given that Beam
>>>>> encourages contributors to vote on RC’s regardless of tenure, and that
>>>>> voting on an RC is a relatively low-effort, high leverage way to influence
>>>>> the release of the library, I propose the following:
>>>>>
>>>>> During the vote for the next release, voters can document the process
>>>>> they followed on a separate document, and add the link on column G
>>>>> here
>>>>> <https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=437054928>.
>>>>> One step further, could be a screencast of running the test, and attaching
>>>>> a link of that.
>>>>>
>>>>> We can keep repeating this through releases until we have
>>>>> documentation for many of the different tests. We can then add these docs
>>>>> into the repo.
>>>>>
>>>>> I’m proposing this because I’ve gathered the following feedback from
>>>>> colleagues that are tangentially involved with Beam: They are interested 
>>>>> in
>>>>> participating in release validation, but don’t know how to get started.
>>>>> Happy to hear other suggestions too, if there are any to address the
>>>>> above.
>>>>>
>>>>> Thanks,
>>>>>
>>>>>
>>>>> Svetak Sundhar
>>>>>
>>>>>   Data Engineer
>>>>> s vetaksund...@google.com
>>>>>
>>>>>


Re: [Discuss] Idea to increase RC voting participation

2023-10-17 Thread Robert Bradshaw via dev
+1

I would also strongly suggest that people try out the release against their
own codebases. This has the benefit of ensuring the release won't break
your own code when they go out, and stress-tests the new code against
real-world pipelines. (Ideally our own tests are all passing, and this
validation is automated as much as possible (though ensuring it matches our
documentation and works in a clean environment still has value), but
there's a lot of code and uses out there that we don't have access to
during normal Beam development.)

On Tue, Oct 17, 2023 at 8:21 AM Svetak Sundhar via dev 
wrote:

> Hi all,
>
> I’ve participated in RC testing for a few releases and have observed a bit
> of a knowledge gap in how releases can be tested. Given that Beam
> encourages contributors to vote on RC’s regardless of tenure, and that
> voting on an RC is a relatively low-effort, high leverage way to influence
> the release of the library, I propose the following:
>
> During the vote for the next release, voters can document the process they
> followed on a separate document, and add the link on column G here
> .
> One step further, could be a screencast of running the test, and attaching
> a link of that.
>
> We can keep repeating this through releases until we have documentation
> for many of the different tests. We can then add these docs into the repo.
>
> I’m proposing this because I’ve gathered the following feedback from
> colleagues that are tangentially involved with Beam: They are interested in
> participating in release validation, but don’t know how to get started.
> Happy to hear other suggestions too, if there are any to address the
> above.
>
> Thanks,
>
>
> Svetak Sundhar
>
>   Data Engineer
> s vetaksund...@google.com
>
>


Re: Enable state cache in Python SDK

2023-10-16 Thread Robert Bradshaw via dev
+1, we should definitely be enabling at least some caching by default
here. Added some comments to the doc.

On Mon, Oct 16, 2023 at 9:27 AM Anand Inguva via dev
 wrote:
>
> Hello,
>
> In Python SDK, the user state and side input caching is disabled by default 
> for all the runners except FnAPI direct runner(intended for testing 
> purposes). I would like to propose that we enable the state cache for the 
> Python SDK similar to other SDKs. I created a doc[1] on why we need to do it, 
> advantages and disadvantages along with future improvements.
>
> Please take a look and let me know what you think.
>
> Thanks,
> Anand
>
> [1] 
> https://docs.google.com/document/d/1gllYsIFqKt4TWAxQmXU_-sw7SLnur2Q69d__N0XBMdE/edit
>
>


Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via dev
On Fri, Oct 13, 2023 at 10:08 AM Joey Tran 
wrote:

> That makes sense. Would you suggest the new option simply suppress the
> RuntimeError and use the non-unique label?
>

Yes. (Or, rather, not raise it.)


> Are there places on the SDK side that expect unique labels? Or in
> non-updateable runners?
>

That's a good question. The label eventually ends up here:
https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L142
which is technically a violation of the spec if these are not unique
(though I guess Java already does this?). More importantly, though, the id
of the transform (the proto contains a map of strings -> transforms) must
be unique.


> Would making `--auto_unique_labels` a mutually exclusive option with
> streaming be a reasonable option? Not sure if stream/batch-specific options
> are a good idea or not... but if there's precedent, then it'd be an easy
> option
>

It's not always possible to even tell at pipeline construction whether the
pipeline will be run in streaming mode. (I suppose one could check later?)
It's generally something we try to avoid though.

Another option is to make the suffix a uuid rather than a single counter.
(This would still have issues with the first application possibly getting
mixed up with a "different" first application unless it was always
appended.)


> On Fri, Oct 13, 2023 at 12:52 PM Robert Bradshaw 
> wrote:
>
>>
>> Thanks for the PR.
>>
>> I think we should follow Java and allow non-unique labels, but not
>> provide automatic uniquification, In particular, the danger of using a
>> counter is that one can get accidental (and potentially hard to check)
>> off-by-one collisions. As a concrete example, imagine one partitions a
>> dataset into two collections, each followed by a similarly-named transform.
>>
>> --> B
>>   /
>> A
>>  \
>>--> B
>>
>> Uniquification would give something like
>>
>> --> B
>>   /
>> A
>>  \
>>--> B_2
>>
>> Suppose one then realizes there's a third case to handle, giving
>>
>> --> B
>>   /
>> A --> B
>>  \
>>--> B
>>
>> But this would be uniquified to
>>
>> --> B
>>   /
>> A --> B_2
>>  \
>>--> B_3
>>
>> where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
>> This is bad because an updating runner would then attribute old B_2's state
>> to the new B_2 (and also possibly mis-direct any inflight messages). At
>> least with the old, intersecting names we can detect this problem
>> rather than silently give corrupt data.
>>
>>
>> On Fri, Oct 13, 2023 at 7:15 AM Joey Tran 
>> wrote:
>>
>>> For posterity: https://github.com/apache/beam/pull/28984
>>>
>>> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> I would definitely support a PR making this an option. Changing the
>>>> default would be a rather big change that would require more thought.
>>>>
>>>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>>>> might find inconvenient.
>>>>>
>>>>> If there's a conclusion to make the behavior similar to java, I'm
>>>>> happy to put up a PR
>>>>>
>>>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>>>> be a very sticky toggle since it'd be easy for PTransforms to 
>>>>>> accidentally
>>>>>> rely on it.
>>>>>>
>>>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>>>> wrote:
>>>>>>
>>>>>>> Huh. This used to be a hard error in Java, but I guess it's
>>>>>>> togglable with an option now. We should probably add the option to 
>>>>>>> toggle
>>>>>>> Python too. (Unclear what the default should be, but this probably ties
>>>>>>> into re-thinking how pipeline update should work.)
>>>>>>>
>>>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>>>>> wrote:
>>>>>>>
>>>>>

Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via dev
Thanks for the PR.

I think we should follow Java and allow non-unique labels, but not provide
automatic uniquification, In particular, the danger of using a counter is
that one can get accidental (and potentially hard to check) off-by-one
collisions. As a concrete example, imagine one partitions a dataset into
two collections, each followed by a similarly-named transform.

--> B
  /
A
 \
   --> B

Uniquification would give something like

--> B
  /
A
 \
   --> B_2

Suppose one then realizes there's a third case to handle, giving

--> B
  /
A --> B
 \
   --> B

But this would be uniquified to

--> B
  /
A --> B_2
 \
   --> B_3

where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
This is bad because an updating runner would then attribute old B_2's state
to the new B_2 (and also possibly mis-direct any inflight messages). At
least with the old, intersecting names we can detect this problem
rather than silently give corrupt data.


On Fri, Oct 13, 2023 at 7:15 AM Joey Tran  wrote:

> For posterity: https://github.com/apache/beam/pull/28984
>
> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
> wrote:
>
>> I would definitely support a PR making this an option. Changing the
>> default would be a rather big change that would require more thought.
>>
>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>> wrote:
>>
>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>> might find inconvenient.
>>>
>>> If there's a conclusion to make the behavior similar to java, I'm happy
>>> to put up a PR
>>>
>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>> wrote:
>>>
>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>> be a very sticky toggle since it'd be easy for PTransforms to accidentally
>>>> rely on it.
>>>>
>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>>>> with an option now. We should probably add the option to toggle Python 
>>>>> too.
>>>>> (Unclear what the default should be, but this probably ties into
>>>>> re-thinking how pipeline update should work.)
>>>>>
>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Makes sense that the requirement is the same, but is the label
>>>>>> auto-generation behavior the same? I modified the BeamJava
>>>>>> wordcount example[1] to do the regex filter twice in a row, and unlike 
>>>>>> the
>>>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>>>> exception.
>>>>>>
>>>>>> Tangentially, is it expected that the Beam playground examples don't
>>>>>> have a way to see the outputs of a run example? I have a vague memory 
>>>>>> that
>>>>>> there used to be a way to navigate to an output file after it's generated
>>>>>> but not sure if I just dreamt that up. Playing with the examples, I 
>>>>>> wasn't
>>>>>> positive if my runs were actually succeeding or not based on the stdout
>>>>>> alone.
>>>>>>
>>>>>> [1] https://play.beam.apache.org/?sdk=java=mI7WUeje_r2
>>>>>> <https://play.beam.apache.org/?sdk=java=mI7WUeje_r2>
>>>>>> [2] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
>>>>>>
>>>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>>>> u...@beam.apache.org> wrote:
>>>>>>
>>>>>>> BeamJava and BeamPython have the exact same behavior:
>>>>>>> transform names within must be distinct [1]. This is because we do not
>>>>>>> necessarily know at pipeline construction time if the pipeline will be
>>>>>>> streaming or batch, or if it will be updated in the future, so the 
>>>>>>> decision
>>>>>>> was made to impose this restriction up front. Both will auto-generate a
>>>>>>> name for you if one is not given, but will do so deterministically (not
>>>>>>> depending on some global context) to avoid potential update problems.
>>>>>>>
>>>>>>> [1] Not

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-12 Thread Robert Bradshaw via dev
OK, so how about this for a concrete proposal:

sink:
  type: WriteToParquet
  config:
path:
"/beam/filesytem/{record.my_col}-{timestamp.year}{timestamp.month}{timestamp.day}"
suffix: ".parquet"

The eventual path would be . The suffix
would be optional, and there could be a default for the specific file
format. A file format could inspect a provided suffix like ".csv.gz" to
infer compression as well.

Note that this doesn't have any special indicators for being dynamic other
than the {}'s. Also, my_col would be written as part of the data (but we
could add an extra "elide" config parameter that takes a list of columns to
exclude if desired).

We could call this "prefix" rather than path. (Path is symmetric with
reading, but prefix is a bit more direct.) Anyone want to voice
their opinion here?




On Wed, Oct 11, 2023 at 9:01 AM Chamikara Jayalath 
wrote:

>
>
> On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles  wrote:
>
>> So, top-posting because the threading got to be a lot for me and I think
>> it forked a bit too... I may even be restating something someone said, so
>> apologies for that.
>>
>> Very very good point about *required* parameters where if you don't use
>> them then you will end up with two writers writing to the same file. The
>> easiest example to work with might be if you omitted SHARD_NUM so all
>> shards end up clobbering the same file.
>>
>> I think there's a unifying perspective between prefix/suffix and the need
>> to be sure to include critical sharding variables. Essentially it is my
>> point about it being a "big data fileset". It is perhaps unrealistic but
>> ideally the user names the big data fileset and then the mandatory other
>> pieces are added outside of their control. For example if I name my big
>> data fileset "foo" then that implicitly means that "foo" consists of all
>> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I
>> re-read I see you basically said the same thing. In some cases the required
>> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the
>> user can think of it as a textual template, if we can use a library that
>> yields an abstract syntax tree for the expression we can easily check these
>> requirements in a robust way - or we could do it in a non-robust way be
>> string-scraping ourselves.
>>
>
> Yes. I think we are talking about the same thing. Users should not have
> full control over the filename since that could lead to conflicts and data
> loss when data is being written in parallel from multiple workers. Users
> can refer to the big data fileset being written using the glob "/**".
> In addition users have control over the filename  and 
> (file extension, for example) which can be useful for some downstream
> use-cases. Rest of the filename will be filled out by the SDK (window, pane
> etc.) to make sure that the files written by different workers do not
> conflict.
>
> Thanks,
> Cham
>
>
>>
>> We actually are very close to this in FileIO. I think the interpretation
>> of "prefix" is that it is the filename "foo" as above, and "suffix" is
>> really something like ".txt" that you stick on the end of everything for
>> whatever reason.
>>
>> Kenn
>>
>> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>>
>>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>>
>>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> I suspect some simple pattern templating would solve most use cases.
>>>>>>> We probably would want to support timestamp formatting (e.g. $ $M 
>>>>>>> $D)
>>>>>>> as well.
>>>>>>>
>>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I would say:
>>>>>>>>>
>>>>>>>>>   

Re: [Question] Read Parquet Schema from S3 Directory

2023-10-12 Thread Robert Bradshaw via dev
You'll probably need to resolve "s3a:///*.parquet" out into a
concrete non-glob filepattern to inspect it this way. Presumably any
individual shard will do. match and open from
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/FileSystems.html
may be useful.

On Wed, Oct 11, 2023 at 10:29 AM Ramya Prasad via dev 
wrote:

> Hello,
>
> I am a developer trying to use Apache Beam in my Java application, and I'm
> running into an issue with reading multiple Parquet files from a directory
> in S3. I'm able to successfully run this line of code, where tempPath  =
> "s3:///*.parquet":
> PCollection records = pipeline.apply("Read parquet file in
> as Generic Records", ParquetIO.read(schema).from(tempPath));
>
> My problem is reading the schema beforehand. At runtime, I only have the
> name of the S3 bucket, which has all the Parquet files I need underneath
> it. However, I am unable to use that same tempPath above to retrieve my
> schema. Because the path is not pointing to a singular parquet file, the
> ParquetFileReader class from Apache Hadoop throws an error: No such file or
> directory: s3a:///*.parquet.
>
> To read my schema, I'm using this chunk of code:
>
> Configuration configuration = new Configuration();
> configuration.set("fs.s3a.access.key",");
> configuration.set("fs.s3a.secret.key", "");
> configuration.set("fs.s3a.session.token","");
> configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
> configuration.set("fs.s3a.server-side-encryption-algorithm", "");
> configuration.set("fs.s3a.proxy.host", "");
> configuration.set("fs.s3a.proxy.port", "");
> configuration.set("fs.s3a.aws.credentials.provider", 
> "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
>
> String hadoopFilePath = new Path("s3a:///*.parquet");
> ParquetFileReader r = 
> ParquetFileReader.open(HadoopInputFile.fromPath(hadoopFilePath, 
> configuration));
> MessageType messageType = r.getFooter().getFileMetaData().getSchema();
> AvroSchemaConverter converter = new AvroSchemaConverter();
> Schema schema = converter.convert(messageType);
>
> The red line is where the code is failing. Is there maybe a Hadoop
> Configuration I can set to force Hadoop to read recursively?
>
> I realize this is kind of a Beam-adjacent problem, but I've been
> struggling with this for a while, so any help would be appreciated!
>
> Thanks and sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: Proposal for pyproject.toml Support in Apache Beam Python

2023-10-12 Thread Robert Bradshaw via dev
On Thu, Oct 12, 2023 at 2:04 PM Anand Inguva  wrote:

> I am in the process of updating the documentation at
> https://cwiki.apache.org/confluence/display/BEAM/Python+Tips related to
> setup.py/pyproject.toml changes, but yes you can't call setup.py directly
> because it might fail due to the lack of presence of beam python's build
> time dependencies.
>
> With regards to other files(eg:protos), we will follow the similar
> behavior as before(generating proros using `gen_protos.py`).
>

Meaning this will be called automatically when needed (e.g. from pytest)?


> On Thu, Oct 12, 2023 at 4:01 PM Robert Bradshaw 
> wrote:
>
>> Does this change any development practices? E.g. if I clone the repo, I'm
>> assuming I couldn't run "setup.py test" anymore. What about the generated
>> files (like protos, or the yaml definitions copied from other parts of the
>> repo)?
>>
>> On Thu, Oct 12, 2023 at 12:27 PM Anand Inguva via dev <
>> dev@beam.apache.org> wrote:
>>
>>> The PR https://github.com/apache/beam/pull/28385 is merged today. If
>>> there are any observed failures, please comment on the PR and I will follow
>>> up with a forward fix. Thanks.
>>>
>>> On Fri, Sep 1, 2023 at 2:30 PM Anand Inguva 
>>> wrote:
>>>
>>>> Since there is positive feedback from the dev community, I am going
>>>> ahead and implementing this proposal for Python SDK.
>>>>
>>>> @aus...@apache.org   Initially let's move forward
>>>> with the setuptools as backend for building package and as part of the
>>>> future work, we can find a better backend than setuptools.
>>>>
>>>> Thanks for the feedback.
>>>> Anand
>>>>
>>>> On Mon, Aug 28, 2023 at 12:00 PM Austin Bennett 
>>>> wrote:
>>>>
>>>>> I've thought about this a ton, but haven't been in a position to
>>>>> undertake the work.  Thanks for bringing this up, @Anand Inguva
>>>>>  !
>>>>>
>>>>> I'd point us to https://python-poetry.org/  ... [ which is where I'd
>>>>> look take us, but I'm also not able to do all the work, so my
>>>>> suggestion/preference doensn't matter that much ]
>>>>>
>>>>> https://python-poetry.org/docs/pyproject#the-pyprojecttoml-file <-
>>>>> for info on pyproject.toml file.
>>>>>
>>>>> Notice the use of a 'lock' file is very valuable, ex:
>>>>> https://python-poetry.org/docs/basic-usage/#committing-your-poetrylock-file-to-version-control
>>>>>
>>>>> I haven't come across `build`, that might be great too.  I'd highlight
>>>>> that Poetry is pretty common across industry these days, rock-solid,
>>>>> ecosystem of interoperability, users, etc...   If not familiar, PLEASE 
>>>>> have
>>>>> a look at that.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Aug 28, 2023 at 8:04 AM Kerry Donny-Clark via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> +1
>>>>>> Hi Anand,
>>>>>> I appreciate this effort. Managing python dependencies has been a
>>>>>> major pain point for me, and I think this approach would help.
>>>>>> Kerry
>>>>>>
>>>>>> On Mon, Aug 28, 2023 at 10:14 AM Anand Inguva via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hello Beam Dev Team,
>>>>>>>
>>>>>>> I've compiled a design document
>>>>>>> <https://docs.google.com/document/d/17-y48WW25-VGBWZNyTdoN0WUN03k9ZhJjLp9wtyG1Wc/edit#heading=h.wskna8eurvjv>[1]
>>>>>>> proposing the integration of pyproject.toml into Apache Beam's Python 
>>>>>>> build
>>>>>>> process. Your insights and feedback would be invaluable.
>>>>>>>
>>>>>>> What is pyproject.toml?
>>>>>>> pyproject.toml is a configuration file that specifies a project's
>>>>>>> build dependencies and other project-related metadata in a standardized
>>>>>>> format. Before pyproject.toml, Python projects often had multiple
>>>>>>> configuration files (like setup.py, setup.cfg, and requirements.txt).
>>>>>>> pyproject.toml aims to centralize these configurations into one place,
>>>>>>> making project setups more organized and straightforward. One of the
>>>>>>> significant features enabled by pyproject.toml is the ability to perform
>>>>>>> isolated builds. This ensures that build dependencies are separated from
>>>>>>> the project's runtime dependencies, leading to more consistent and
>>>>>>> reproducible builds.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://docs.google.com/document/d/17-y48WW25-VGBWZNyTdoN0WUN03k9ZhJjLp9wtyG1Wc/edit#heading=h.wskna8eurvjv
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anand
>>>>>>>
>>>>>>


Re: Proposal for pyproject.toml Support in Apache Beam Python

2023-10-12 Thread Robert Bradshaw via dev
Does this change any development practices? E.g. if I clone the repo, I'm
assuming I couldn't run "setup.py test" anymore. What about the generated
files (like protos, or the yaml definitions copied from other parts of the
repo)?

On Thu, Oct 12, 2023 at 12:27 PM Anand Inguva via dev 
wrote:

> The PR https://github.com/apache/beam/pull/28385 is merged today. If
> there are any observed failures, please comment on the PR and I will follow
> up with a forward fix. Thanks.
>
> On Fri, Sep 1, 2023 at 2:30 PM Anand Inguva 
> wrote:
>
>> Since there is positive feedback from the dev community, I am going ahead
>> and implementing this proposal for Python SDK.
>>
>> @aus...@apache.org   Initially let's move forward
>> with the setuptools as backend for building package and as part of the
>> future work, we can find a better backend than setuptools.
>>
>> Thanks for the feedback.
>> Anand
>>
>> On Mon, Aug 28, 2023 at 12:00 PM Austin Bennett 
>> wrote:
>>
>>> I've thought about this a ton, but haven't been in a position to
>>> undertake the work.  Thanks for bringing this up, @Anand Inguva
>>>  !
>>>
>>> I'd point us to https://python-poetry.org/  ... [ which is where I'd
>>> look take us, but I'm also not able to do all the work, so my
>>> suggestion/preference doensn't matter that much ]
>>>
>>> https://python-poetry.org/docs/pyproject#the-pyprojecttoml-file <- for
>>> info on pyproject.toml file.
>>>
>>> Notice the use of a 'lock' file is very valuable, ex:
>>> https://python-poetry.org/docs/basic-usage/#committing-your-poetrylock-file-to-version-control
>>>
>>> I haven't come across `build`, that might be great too.  I'd highlight
>>> that Poetry is pretty common across industry these days, rock-solid,
>>> ecosystem of interoperability, users, etc...   If not familiar, PLEASE have
>>> a look at that.
>>>
>>>
>>>
>>>
>>> On Mon, Aug 28, 2023 at 8:04 AM Kerry Donny-Clark via dev <
>>> dev@beam.apache.org> wrote:
>>>
 +1
 Hi Anand,
 I appreciate this effort. Managing python dependencies has been a major
 pain point for me, and I think this approach would help.
 Kerry

 On Mon, Aug 28, 2023 at 10:14 AM Anand Inguva via dev <
 dev@beam.apache.org> wrote:

> Hello Beam Dev Team,
>
> I've compiled a design document
> [1]
> proposing the integration of pyproject.toml into Apache Beam's Python 
> build
> process. Your insights and feedback would be invaluable.
>
> What is pyproject.toml?
> pyproject.toml is a configuration file that specifies a project's
> build dependencies and other project-related metadata in a standardized
> format. Before pyproject.toml, Python projects often had multiple
> configuration files (like setup.py, setup.cfg, and requirements.txt).
> pyproject.toml aims to centralize these configurations into one place,
> making project setups more organized and straightforward. One of the
> significant features enabled by pyproject.toml is the ability to perform
> isolated builds. This ensures that build dependencies are separated from
> the project's runtime dependencies, leading to more consistent and
> reproducible builds.
>
> [1]
> https://docs.google.com/document/d/17-y48WW25-VGBWZNyTdoN0WUN03k9ZhJjLp9wtyG1Wc/edit#heading=h.wskna8eurvjv
>
> Thanks,
> Anand
>



Re: [QUESTION] Why no auto labels?

2023-10-10 Thread Robert Bradshaw via dev
I would definitely support a PR making this an option. Changing the default
would be a rather big change that would require more thought.

On Tue, Oct 10, 2023 at 4:24 PM Joey Tran  wrote:

> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
> Apache Beam at my company and I'm trying to foresee parts of the API they
> might find inconvenient.
>
> If there's a conclusion to make the behavior similar to java, I'm happy to
> put up a PR
>
> On Thu, Oct 5, 2023, 12:49 PM Joey Tran  wrote:
>
>> Is it really toggleable in Java? I imagine that if it's a toggle it'd be
>> a very sticky toggle since it'd be easy for PTransforms to accidentally
>> rely on it.
>>
>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>> wrote:
>>
>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>> with an option now. We should probably add the option to toggle Python too.
>>> (Unclear what the default should be, but this probably ties into
>>> re-thinking how pipeline update should work.)
>>>
>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>> wrote:
>>>
>>>> Makes sense that the requirement is the same, but is the label
>>>> auto-generation behavior the same? I modified the BeamJava
>>>> wordcount example[1] to do the regex filter twice in a row, and unlike the
>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>> exception.
>>>>
>>>> Tangentially, is it expected that the Beam playground examples don't
>>>> have a way to see the outputs of a run example? I have a vague memory that
>>>> there used to be a way to navigate to an output file after it's generated
>>>> but not sure if I just dreamt that up. Playing with the examples, I wasn't
>>>> positive if my runs were actually succeeding or not based on the stdout
>>>> alone.
>>>>
>>>> [1] https://play.beam.apache.org/?sdk=java=mI7WUeje_r2
>>>> <https://play.beam.apache.org/?sdk=java=mI7WUeje_r2>
>>>> [2] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
>>>>
>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>> u...@beam.apache.org> wrote:
>>>>
>>>>> BeamJava and BeamPython have the exact same behavior: transform names
>>>>> within must be distinct [1]. This is because we do not necessarily know at
>>>>> pipeline construction time if the pipeline will be streaming or batch, or
>>>>> if it will be updated in the future, so the decision was made to impose
>>>>> this restriction up front. Both will auto-generate a name for you if one 
>>>>> is
>>>>> not given, but will do so deterministically (not depending on some global
>>>>> context) to avoid potential update problems.
>>>>>
>>>>> [1] Note that this applies to the fully qualified transform name, so
>>>>> the naming only has to be distinct within a composite transform (or at the
>>>>> top level--the pipeline itself is isomorphic to a single composite
>>>>> transform).
>>>>>
>>>>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Cross posting this thread to dev@ to see if this is intentional
>>>>>> behavior or if it's something worth changing for the python SDK
>>>>>>
>>>>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user 
>>>>>> wrote:
>>>>>>
>>>>>>> That suggests the default label is created as that, which indeed
>>>>>>> causes the duplication error.
>>>>>>>
>>>>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not sure what that suggests
>>>>>>>>
>>>>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>>>>> `Filter(identity_filter)`.
>>>>>>>>>
>>>>>>>>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran <
>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>
>>>

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath 
wrote:

>
> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw 
> wrote:
>
>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath 
>> wrote:
>>
>>>
>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>>>
>>>> I suspect some simple pattern templating would solve most use cases. We
>>>> probably would want to support timestamp formatting (e.g. $YYYY $M $D) as
>>>> well.
>>>>
>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> I would say:
>>>>>>
>>>>>> sink:
>>>>>>   type: WriteToParquet
>>>>>>   config:
>>>>>> path: /beam/filesytem/dest
>>>>>> prefix: 
>>>>>> suffix: 
>>>>>>
>>>>>> Underlying SDK will add the middle part of the file names to make
>>>>>> sure that files generated by various bundles/windows/shards do not 
>>>>>> conflict.
>>>>>>
>>>>>
>>>>> What's the relationship between path and prefix? Is path the
>>>>> directory part of the full path, or does prefix precede it?
>>>>>
>>>>
>>> prefix would be the first part of the file name so each shard will be
>>> named.
>>> /--
>>>
>>> This is similar to what we do in existing SDKS. For example, Java FileIO,
>>>
>>>
>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>>>
>>
>> Yeah, although there's no distinction between path and prefix.
>>
>
> Ah, for FIleIO, path comes from the "to" call.
>
>
> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125
>
>

Ah. I guess there's some inconsistency here, e.g. text files are written to
a filenamePrefix that subsumes both:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String-


>
>>
>>>>>
>>>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>>>> customizing the file pattern sounds like a more advanced use case that 
>>>>>> can
>>>>>> be left for "real" SDKs.
>>>>>>
>>>>>
>>>>> Yea, we don't have to do everything.
>>>>>
>>>>>
>>>>>> For dynamic destinations, I think just making the "path" component
>>>>>> support  a lambda that is parameterized by the input should be adequate
>>>>>> since this allows customers to direct files written to different
>>>>>> destination directories.
>>>>>>
>>>>>> sink:
>>>>>>   type: WriteToParquet
>>>>>>   config:
>>>>>> path: 
>>>>>> prefix: 
>>>>>> suffix: 
>>>>>>
>>>>>> I'm not sure what would be the best way to specify a lambda here
>>>>>> though. Maybe a regex or the name of a Python callable ?
>>>>>>
>>>>>
>>>>> I'd rather not require Python for a pure Java pipeline, but some kind
>>>>> of a pattern template may be sufficient here.
>>>>>
>>>>>
>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>>>> having
>>>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>>>> create a KV where the key contained the dynamic information). The 
>>>>>>>> problem
>>>>>>>> was t

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 4:03 PM Robert Bradshaw  wrote:

> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>
>> I suspect some simple pattern templating would solve most use cases.
>>
>
> That's what I'm leaning towards as well.
>
>
>> We probably would want to support timestamp formatting (e.g. $ $M $D)
>> as well.
>>
>
> Although we have several timestamps to consider: the element timestamp,
> its window start and/or end, walltime.
>

And users may want some of these to be (part of) the directory structure as
well, not just in a "sharding" suffix.


> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>> wrote:
>>
>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> I would say:
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: /beam/filesytem/dest
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> Underlying SDK will add the middle part of the file names to make sure
>>>> that files generated by various bundles/windows/shards do not conflict.
>>>>
>>>
>>> What's the relationship between path and prefix? Is path the
>>> directory part of the full path, or does prefix precede it?
>>>
>>>
>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>> customizing the file pattern sounds like a more advanced use case that can
>>>> be left for "real" SDKs.
>>>>
>>>
>>> Yea, we don't have to do everything.
>>>
>>>
>>>> For dynamic destinations, I think just making the "path" component
>>>> support  a lambda that is parameterized by the input should be adequate
>>>> since this allows customers to direct files written to different
>>>> destination directories.
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: 
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> I'm not sure what would be the best way to specify a lambda here
>>>> though. Maybe a regex or the name of a Python callable ?
>>>>
>>>
>>> I'd rather not require Python for a pure Java pipeline, but some kind of
>>> a pattern template may be sufficient here.
>>>
>>>
>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>>
>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>> having
>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>> create a KV where the key contained the dynamic information). The problem
>>>>>> was that often the size of the generated filepath was often much larger
>>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>> integer userid, and the filepath prefix would then be
>>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>>> the
>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>
>>>>>
>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>> bringing this up.
>>>>>
>>>>>
>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>>> from the record!), but we should keep in mind that this might mean that
>>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>>
>>>>>
>>>>> Yep, it's not as straightforward to do in a declarative way. I would
>>>>> like to avoid mixing U

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:

> I suspect some simple pattern templating would solve most use cases.
>

That's what I'm leaning towards as well.


> We probably would want to support timestamp formatting (e.g. $ $M $D)
> as well.
>

Although we have several timestamps to consider: the element timestamp, its
window start and/or end, walltime.


> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
> wrote:
>
>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
>> wrote:
>>
>>> I would say:
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: /beam/filesytem/dest
>>> prefix: 
>>> suffix: 
>>>
>>> Underlying SDK will add the middle part of the file names to make sure
>>> that files generated by various bundles/windows/shards do not conflict.
>>>
>>
>> What's the relationship between path and prefix? Is path the
>> directory part of the full path, or does prefix precede it?
>>
>>
>>> This will satisfy the vast majority of use-cases I believe. Fully
>>> customizing the file pattern sounds like a more advanced use case that can
>>> be left for "real" SDKs.
>>>
>>
>> Yea, we don't have to do everything.
>>
>>
>>> For dynamic destinations, I think just making the "path" component
>>> support  a lambda that is parameterized by the input should be adequate
>>> since this allows customers to direct files written to different
>>> destination directories.
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: 
>>> prefix: 
>>> suffix: 
>>>
>>> I'm not sure what would be the best way to specify a lambda here though.
>>> Maybe a regex or the name of a Python callable ?
>>>
>>
>> I'd rather not require Python for a pure Java pipeline, but some kind of
>> a pattern template may be sufficient here.
>>
>>
>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>
>>>>> Just FYI - the reason why names (including prefixes) in
>>>>> DynamicDestinations were parameterized via a lambda instead of just having
>>>>> the user add it via MapElements is performance. We discussed something
>>>>> along the lines of what you are suggesting (essentially having the user
>>>>> create a KV where the key contained the dynamic information). The problem
>>>>> was that often the size of the generated filepath was often much larger
>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>> integer userid, and the filepath prefix would then be
>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>> the
>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>
>>>>
>>>> That is a consideration I hadn't thought much of, thanks for
>>>> bringing this up.
>>>>
>>>>
>>>>> Now there may not be any good way to keep this benefit in a
>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>> from the record!), but we should keep in mind that this might mean that
>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>
>>>>
>>>> Yep, it's not as straightforward to do in a declarative way. I would
>>>> like to avoid mixing UDFs (with their associated languages and execution
>>>> environments) if possible. Though I'd like the performance of a
>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>> straight-line Java (and possibly better, if we can leverage the structure
>>>> of schemas everywhere) this is not an absolute requirement for all
>>>> features.
>>>>
>>>> I wonder if separating out a constant prefix vs. the dynamic stuff
>>>> could be sufficient to mitigate the blow-up of pre-computing this in

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath 
wrote:

>
> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax  wrote:
>
>> I suspect some simple pattern templating would solve most use cases. We
>> probably would want to support timestamp formatting (e.g. $ $M $D) as
>> well.
>>
>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw 
>> wrote:
>>
>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> I would say:
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: /beam/filesytem/dest
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> Underlying SDK will add the middle part of the file names to make sure
>>>> that files generated by various bundles/windows/shards do not conflict.
>>>>
>>>
>>> What's the relationship between path and prefix? Is path the
>>> directory part of the full path, or does prefix precede it?
>>>
>>
> prefix would be the first part of the file name so each shard will be
> named.
> /--
>
> This is similar to what we do in existing SDKS. For example, Java FileIO,
>
>
> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>

Yeah, although there's no distinction between path and prefix.


>>>
>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>> customizing the file pattern sounds like a more advanced use case that can
>>>> be left for "real" SDKs.
>>>>
>>>
>>> Yea, we don't have to do everything.
>>>
>>>
>>>> For dynamic destinations, I think just making the "path" component
>>>> support  a lambda that is parameterized by the input should be adequate
>>>> since this allows customers to direct files written to different
>>>> destination directories.
>>>>
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: 
>>>> prefix: 
>>>> suffix: 
>>>>
>>>> I'm not sure what would be the best way to specify a lambda here
>>>> though. Maybe a regex or the name of a Python callable ?
>>>>
>>>
>>> I'd rather not require Python for a pure Java pipeline, but some kind of
>>> a pattern template may be sufficient here.
>>>
>>>
>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>>
>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>> having
>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>> create a KV where the key contained the dynamic information). The problem
>>>>>> was that often the size of the generated filepath was often much larger
>>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>> integer userid, and the filepath prefix would then be
>>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>>> the
>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>
>>>>>
>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>> bringing this up.
>>>>>
>>>>>
>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>>> from the record!), but we should keep in mind that this might mean that
>>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>>
>>>>>
>>>>> Yep, it's not as strai

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
wrote:

> I would say:
>
> sink:
>   type: WriteToParquet
>   config:
> path: /beam/filesytem/dest
> prefix: 
> suffix: 
>
> Underlying SDK will add the middle part of the file names to make sure
> that files generated by various bundles/windows/shards do not conflict.
>

What's the relationship between path and prefix? Is path the directory part
of the full path, or does prefix precede it?


> This will satisfy the vast majority of use-cases I believe. Fully
> customizing the file pattern sounds like a more advanced use case that can
> be left for "real" SDKs.
>

Yea, we don't have to do everything.


> For dynamic destinations, I think just making the "path" component
> support  a lambda that is parameterized by the input should be adequate
> since this allows customers to direct files written to different
> destination directories.
>
> sink:
>   type: WriteToParquet
>   config:
> path: 
> prefix: 
> suffix: 
>
> I'm not sure what would be the best way to specify a lambda here though.
> Maybe a regex or the name of a Python callable ?
>

I'd rather not require Python for a pure Java pipeline, but some kind of a
pattern template may be sufficient here.


> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>
>>> Just FYI - the reason why names (including prefixes) in
>>> DynamicDestinations were parameterized via a lambda instead of just having
>>> the user add it via MapElements is performance. We discussed something
>>> along the lines of what you are suggesting (essentially having the user
>>> create a KV where the key contained the dynamic information). The problem
>>> was that often the size of the generated filepath was often much larger
>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>> desire to avoid record blowup. e.g. the record might contain a single
>>> integer userid, and the filepath prefix would then be
>>> /long/path/to/output/users/. This was especially bad in cases where the
>>> data had to be shuffled, and the existing dynamic destinations method
>>> allowed extracting the filepath only _after_  the shuffle.
>>>
>>
>> That is a consideration I hadn't thought much of, thanks for
>> bringing this up.
>>
>>
>>> Now there may not be any good way to keep this benefit in a
>>> declarative approach such as YAML (or at least a good easy way - we could
>>> always allow the user to pass in a SQL expression to extract the filename
>>> from the record!), but we should keep in mind that this might mean that
>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>
>>
>> Yep, it's not as straightforward to do in a declarative way. I would like
>> to avoid mixing UDFs (with their associated languages and execution
>> environments) if possible. Though I'd like the performance of a
>> "straightforward" YAML pipeline to be that which one can get writing
>> straight-line Java (and possibly better, if we can leverage the structure
>> of schemas everywhere) this is not an absolute requirement for all
>> features.
>>
>> I wonder if separating out a constant prefix vs. the dynamic stuff could
>> be sufficient to mitigate the blow-up of pre-computing this in most cases
>> (especially in the context of a larger pipeline). Alternatively, rather
>> than just a sharding pattern, one could have a full filepattern that
>> includes format parameters for dynamically computed bits as well as the
>> shard number, windowing info, etc. (There are pros and cons to this.)
>>
>>
>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Currently the various file writing configurations take a single
>>>> parameter, path, which indicates where the (sharded) output should be
>>>> placed. In other words, one can write something like
>>>>
>>>>   pipeline:
>>>> ...
>>>> sink:
>>>>   type: WriteToParquet
>>>>   config:
>>>> path: /beam/filesytem/dest
>>>>
>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>
>>>> Of course, in practice file writing is often much more complicated than
>>>> this (especially when it come

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 7:21 AM Kenneth Knowles  wrote:

> Another perspective:
>
> We should focus on the fact that FileIO writes what I would call a "big
> file-based dataset" to a filesystem. The primary characteristic of a "big
> file-based dataset" is that it is sharded and that the shards should not
> have any individual distinctiveness. The dataset should be read and written
> as a whole, and the shards are an implementation detail for performance.
>
> This impact dynamic destinations in two ways that I can think of right
> away:
>
>  - It is critical by definition to be able to refer to a whole "big
> file-based dataset" as a whole thing. The most obvious way would be for it
> to exist as a folder or folder-like grouping of files, and you glob
> everything underneath that. But the hard requirement is that there is
> *some* way to refer to the dataset as a single entity (via glob,
> basically).
>
>  - When using "dynamic destinations" style functionality, each of the
> dynamic destinations is a "big file-based dataset". The purpose is to route
> a datum to the correct one of these, NOT to route the datum to a particular
> file (which would be just some anonymous shard in the dataset).
>
> So having really fine-grained control over filenames is likely to result
> in anti-patterns of malformed datasets that cannot be easily globbed or, in
> the converse case, well-formed datasets that have suboptimal sharding
> because it was manually managed.
>
> I know that "reality" is not this simple, because people have accumulated
> files they have to work with as-is, where they probably didn't plan for
> this way of thinking when they were gathering the files. We need to give
> good options for everyone, but the golden path should be the simple and
> good case.
>

Yeah. I think both are valid perspectives (especially when trying to fit in
with the often messy setups that exist today).


> On Tue, Oct 10, 2023 at 10:09 AM Kenneth Knowles  wrote:
>
>> Since I've been in GHA files lately...
>>
>> I think they have a very useful pattern which we could borrow from or
>> learn from, where setting up the variables happens separately, like
>> https://github.com/apache/beam/blob/57821c191d322f9f21c01a34c55e0c40eda44f1e/.github/workflows/build_release_candidate.yml#L270
>>
>> If we called the section "vars" and then the config could use the vars in
>> the destination. I'm making this example deliberately a little gross:
>>
>>  - vars:
>> - USER_REGION: $.user.metadata.region
>> - USER_GROUP: $.user.groups[0].name
>>  - config:
>> - path:
>> gs://output-bucket-${vars.USER_REGION}/files/${vars.USER_GROUP}-${fileio.SHARD_NUM}-${fileio.WINDOW}
>>
>> I think it strikes a good balance between arbitrary lambdas and just a
>> prefix/suffix control, giving a really easy place where we can say "the
>> whole value of this YAML field is a path expression into the structured
>> data"
>>
>
Would we want to impose any constraints here? E.g. what about
"gs://output-bucket-${fileio.SHARD_NUM}/foo/file-${vars.USER_GROUP}.txt"?
What if the shard number (or windowing information) is omitted? Would we
automatically (optionally)? exclude vars from the written data? I like the
idea of providing string templates as the "function" but things may be less
error-prone if we can automatically insert things.


> Kenn
>>
>> On Mon, Oct 9, 2023 at 6:09 PM Chamikara Jayalath via dev <
>> dev@beam.apache.org> wrote:
>>
>>> I would say:
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: /beam/filesytem/dest
>>> prefix: 
>>> suffix: 
>>>
>>> Underlying SDK will add the middle part of the file names to make sure
>>> that files generated by various bundles/windows/shards do not conflict.
>>>
>>> This will satisfy the vast majority of use-cases I believe. Fully
>>> customizing the file pattern sounds like a more advanced use case that can
>>> be left for "real" SDKs.
>>>
>>> For dynamic destinations, I think just making the "path" component
>>> support  a lambda that is parameterized by the input should be adequate
>>> since this allows customers to direct files written to different
>>> destination directories.
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: 
>>> prefix: 
>>> suffix: 
>>>
>>> I'm not sure 

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Robert Bradshaw via dev
On Tue, Oct 10, 2023 at 7:22 AM Byron Ellis via dev 
wrote:

> FWIW dbt (which is also YAML and has this problem for other reasons) does
> something like this. It also chooses to assume that everything is a string
> but allows users to use the Jinja templating language to make those strings
> dynamic where needed.
>

Are these only for values that are filled in at runtime (i.e. jinja is a
pre-processor used before the yaml file is passed to dbt) or can they be
plugged in (possibly on a per-record basis) from the data itself? (FWIW, I
think we also want to allow some kind of templating like this to allow for
parameterized composite PTransforms to be define in YAML and additionally
we'll need it for YAML-defined templates (not to be confused with the YAML
template which is a single flext template whose single parameter is the
YAML file itself).


> Syntactically I think that's a bit nicer to look at than the shell script
> style and saves having to remember the difference between $() and ${}
>

+1


> On Tue, Oct 10, 2023 at 7:10 AM Kenneth Knowles  wrote:
>
>> Since I've been in GHA files lately...
>>
>> I think they have a very useful pattern which we could borrow from or
>> learn from, where setting up the variables happens separately, like
>> https://github.com/apache/beam/blob/57821c191d322f9f21c01a34c55e0c40eda44f1e/.github/workflows/build_release_candidate.yml#L270
>>
>> If we called the section "vars" and then the config could use the vars in
>> the destination. I'm making this example deliberately a little gross:
>>
>>  - vars:
>> - USER_REGION: $.user.metadata.region
>> - USER_GROUP: $.user.groups[0].name
>>  - config:
>> - path:
>> gs://output-bucket-${vars.USER_REGION}/files/${vars.USER_GROUP}-${fileio.SHARD_NUM}-${fileio.WINDOW}
>>
>> I think it strikes a good balance between arbitrary lambdas and just a
>> prefix/suffix control, giving a really easy place where we can say "the
>> whole value of this YAML field is a path expression into the structured
>> data"
>>
>> Kenn
>>
>> On Mon, Oct 9, 2023 at 6:09 PM Chamikara Jayalath via dev <
>> dev@beam.apache.org> wrote:
>>
>>> I would say:
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: /beam/filesytem/dest
>>> prefix: 
>>> suffix: 
>>>
>>> Underlying SDK will add the middle part of the file names to make sure
>>> that files generated by various bundles/windows/shards do not conflict.
>>>
>>> This will satisfy the vast majority of use-cases I believe. Fully
>>> customizing the file pattern sounds like a more advanced use case that can
>>> be left for "real" SDKs.
>>>
>>> For dynamic destinations, I think just making the "path" component
>>> support  a lambda that is parameterized by the input should be adequate
>>> since this allows customers to direct files written to different
>>> destination directories.
>>>
>>> sink:
>>>   type: WriteToParquet
>>>   config:
>>> path: 
>>> prefix: 
>>> suffix: 
>>>
>>> I'm not sure what would be the best way to specify a lambda here though.
>>> Maybe a regex or the name of a Python callable ?
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>>
>>>>> Just FYI - the reason why names (including prefixes) in
>>>>> DynamicDestinations were parameterized via a lambda instead of just having
>>>>> the user add it via MapElements is performance. We discussed something
>>>>> along the lines of what you are suggesting (essentially having the user
>>>>> create a KV where the key contained the dynamic information). The problem
>>>>> was that often the size of the generated filepath was often much larger
>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>> integer userid, and the filepath prefix would then be
>>>>> /long/path/to/output/users/. This was especially bad in cases where 
>>>>> the
>>>>&

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-09 Thread Robert Bradshaw via dev
.On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:

> Just FYI - the reason why names (including prefixes) in
> DynamicDestinations were parameterized via a lambda instead of just having
> the user add it via MapElements is performance. We discussed something
> along the lines of what you are suggesting (essentially having the user
> create a KV where the key contained the dynamic information). The problem
> was that often the size of the generated filepath was often much larger
> (sometimes by 2 OOM) than the information in the record, and there was a
> desire to avoid record blowup. e.g. the record might contain a single
> integer userid, and the filepath prefix would then be
> /long/path/to/output/users/. This was especially bad in cases where the
> data had to be shuffled, and the existing dynamic destinations method
> allowed extracting the filepath only _after_  the shuffle.
>

That is a consideration I hadn't thought much of, thanks for bringing this
up.


> Now there may not be any good way to keep this benefit in a
> declarative approach such as YAML (or at least a good easy way - we could
> always allow the user to pass in a SQL expression to extract the filename
> from the record!), but we should keep in mind that this might mean that
> YAML-generated pipelines will be less efficient for certain use cases.
>

Yep, it's not as straightforward to do in a declarative way. I would like
to avoid mixing UDFs (with their associated languages and execution
environments) if possible. Though I'd like the performance of a
"straightforward" YAML pipeline to be that which one can get writing
straight-line Java (and possibly better, if we can leverage the structure
of schemas everywhere) this is not an absolute requirement for all
features.

I wonder if separating out a constant prefix vs. the dynamic stuff could be
sufficient to mitigate the blow-up of pre-computing this in most cases
(especially in the context of a larger pipeline). Alternatively, rather
than just a sharding pattern, one could have a full filepattern that
includes format parameters for dynamically computed bits as well as the
shard number, windowing info, etc. (There are pros and cons to this.)


> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Currently the various file writing configurations take a single
>> parameter, path, which indicates where the (sharded) output should be
>> placed. In other words, one can write something like
>>
>>   pipeline:
>> ...
>> sink:
>>   type: WriteToParquet
>>   config:
>> path: /beam/filesytem/dest
>>
>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>
>> Of course, in practice file writing is often much more complicated than
>> this (especially when it comes to Streaming). For reference, I've included
>> links to our existing offerings in the various SDKs below. I'd like to
>> start a discussion about what else should go in the "config" parameter and
>> how it should be expressed in YAML.
>>
>> The primary concern is around naming. This can generally be split into
>> (1) the prefix, which must be provided by the users (2) the sharing
>> information, includes both shard counts (e.g. (the -X-of-N suffix) but also
>> windowing information (for streaming pipelines) which we may want to allow
>> the user to customize the formatting of, and (3) a suffix like .json or
>> .avro that is useful for both humans and tooling and can often be inferred
>> but should allow customization as well.
>>
>> An interesting case is that of dynamic destinations, where the prefix (or
>> other parameters) may themselves be functions of the records themselves. (I
>> am excluding the case where the format itself is variable--such cases are
>> probably better handled by explicitly partitioning the data and doing
>> multiple writes, as this introduces significant complexities and the set of
>> possible formats is generally finite and known ahead of time.) I propose
>> that we leverage the fact that we have structured data to be able to pull
>> out these dynamic parameters. For example, if we have an input data set
>> with a string column my_col we could allow something like
>>
>>   config:
>> path: {dynamic: my_col}
>>
>> which would pull this information out at runtime. (With the MapToFields
>> transform, it is very easy to compute/append additional fields to existing
>> records.) Generally this field would then be stripped from the written
>> data, which would only see the subset of non-dynamically referenced columns
>> (though this could be configurable: we could add an attribute l

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-09 Thread Robert Bradshaw via dev
.On Mon, Oct 9, 2023 at 1:11 PM Robert Burke  wrote:

> I'll note that the file "Writes" in the Go SDK are currently an unscalable
> antipattern, because of this exact question.
>
>  Aside from carefully examining other SDKs it's not clear how one authors
> a reliable, automatically shardable, window and pane aware in an arbitrary
> SDK, simply by referring to common beam constructs.
>
> Closely examining how other SDKs do it is time consuming and an
> antipattern, and doesn't lend itself to educating arbitrary beam end users
> on good patterns and why they work, because they tend not to have that sort
> of commentary (for all the complexity you mention.)
>

Yeah, I agree there's implementation complexity here. Hopefully it only has
to be done once per SDK (and all other file formats can just fill in the
"write this batch of records to a filestream" part), and with
multi-langauge not even all SDKs need to go there (initially at least).


> But it's just as likely I missed a document somewhere. It has been a while
> since I last searched for this, let alone have time to do the deep dives
> required to produce it.
>
> Robert Burke
> Beam Go Busybody
>
>>
>>
>>


[YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-09 Thread Robert Bradshaw via dev
Currently the various file writing configurations take a single parameter,
path, which indicates where the (sharded) output should be placed. In other
words, one can write something like

  pipeline:
...
sink:
  type: WriteToParquet
  config:
path: /beam/filesytem/dest

and one gets files like "/beam/filesystem/dest-X-of-N"

Of course, in practice file writing is often much more complicated than
this (especially when it comes to Streaming). For reference, I've included
links to our existing offerings in the various SDKs below. I'd like to
start a discussion about what else should go in the "config" parameter and
how it should be expressed in YAML.

The primary concern is around naming. This can generally be split into (1)
the prefix, which must be provided by the users (2) the sharing
information, includes both shard counts (e.g. (the -X-of-N suffix) but also
windowing information (for streaming pipelines) which we may want to allow
the user to customize the formatting of, and (3) a suffix like .json or
.avro that is useful for both humans and tooling and can often be inferred
but should allow customization as well.

An interesting case is that of dynamic destinations, where the prefix (or
other parameters) may themselves be functions of the records themselves. (I
am excluding the case where the format itself is variable--such cases are
probably better handled by explicitly partitioning the data and doing
multiple writes, as this introduces significant complexities and the set of
possible formats is generally finite and known ahead of time.) I propose
that we leverage the fact that we have structured data to be able to pull
out these dynamic parameters. For example, if we have an input data set
with a string column my_col we could allow something like

  config:
path: {dynamic: my_col}

which would pull this information out at runtime. (With the MapToFields
transform, it is very easy to compute/append additional fields to existing
records.) Generally this field would then be stripped from the written
data, which would only see the subset of non-dynamically referenced columns
(though this could be configurable: we could add an attribute like
{dynamic: my_col, Keep: true} or require the set of columns to be actually
written (or elided) to be enumerated in the config or allow/require the
actual data to be written to be in a designated field of the "full" input
records as arranged by a preceding transform). It'd be great to get
input/impressions from a wide range of people here on what would be the
most natural. Often just writing out snippets of various alternatives can
be quite informative (though I'm avoiding putting them here for the moment
to avoid biasing ideas right off the bat).

For streaming pipelines it is often essential to write data out in a
time-partitioned manner. The typical way to do this is to add the windowing
information into the shard specification itself, and a (set of) file(s) is
written on each window closing. Beam YAML already supports any transform
being given a "windowing" configuration which will cause a WindowInto
transform to be applied to its input(s) before application which can sit
naturally on a sink. We may want to consider if non-windowed writes make
sense as well (though how this interacts with the watermark and underlying
implementations are a large open question, so this is a larger change that
might make sense to defer).

Note that I am explicitly excluding "coders" here. All data in YAML should
be schema'd, and writers should know how to write this structured data. We
may want to allow a "schema" field to allow a user to specify the desired
schema in a manner compatible with the sink format itself (e.g. avro, json,
whatever) that could be used both for validation and possibly resolving
ambiguities (e.g. if the sink has an enum format that is not expressed in
the schema of the input PCollection).

Some other configuration options are that some formats (especially
text-based ones) allow for specification of an external compression type
(which may be inferable from the suffix), whether to write a single shard
if the input collection is empty or no shards at all (an occasional user
request that's supported for some Beam sinks now), whether to allow fixed
sharing (generally discouraged, as it disables things like automatic
shading based on input size, let alone dynamic work rebalancing, though
sometimes this is useful if the input is known to be small and a single
output is desired regardless of the restriction in parallelism), or other
sharding parameters (e.g. limiting the number of total elements or
(approximately) total number of bytes per output shard). Some of these
options may not be available/implemented for all formats--consideration
should be given as to how to handle this inconsistency (runtime errors for
unsupported combinations or simply not allowing them on any until all are
supported).

A final consideration: we do not anticipate 

Re: [QUESTION] Why no auto labels?

2023-10-05 Thread Robert Bradshaw via dev
Huh. This used to be a hard error in Java, but I guess it's togglable
with an option now. We should probably add the option to toggle Python too.
(Unclear what the default should be, but this probably ties into
re-thinking how pipeline update should work.)

On Thu, Oct 5, 2023 at 4:58 AM Joey Tran  wrote:

> Makes sense that the requirement is the same, but is the label
> auto-generation behavior the same? I modified the BeamJava
> wordcount example[1] to do the regex filter twice in a row, and unlike the
> BeamPython example I posted before, it just warns instead of throwing an
> exception.
>
> Tangentially, is it expected that the Beam playground examples don't have
> a way to see the outputs of a run example? I have a vague memory that there
> used to be a way to navigate to an output file after it's generated but not
> sure if I just dreamt that up. Playing with the examples, I wasn't positive
> if my runs were actually succeeding or not based on the stdout alone.
>
> [1] https://play.beam.apache.org/?sdk=java=mI7WUeje_r2
> <https://play.beam.apache.org/?sdk=java=mI7WUeje_r2>
> [2] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
>
> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
> u...@beam.apache.org> wrote:
>
>> BeamJava and BeamPython have the exact same behavior: transform names
>> within must be distinct [1]. This is because we do not necessarily know at
>> pipeline construction time if the pipeline will be streaming or batch, or
>> if it will be updated in the future, so the decision was made to impose
>> this restriction up front. Both will auto-generate a name for you if one is
>> not given, but will do so deterministically (not depending on some global
>> context) to avoid potential update problems.
>>
>> [1] Note that this applies to the fully qualified transform name, so the
>> naming only has to be distinct within a composite transform (or at the top
>> level--the pipeline itself is isomorphic to a single composite transform).
>>
>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran 
>> wrote:
>>
>>> Cross posting this thread to dev@ to see if this is intentional
>>> behavior or if it's something worth changing for the python SDK
>>>
>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user 
>>> wrote:
>>>
>>>> That suggests the default label is created as that, which indeed causes
>>>> the duplication error.
>>>>
>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Not sure what that suggests
>>>>>
>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>>>> wrote:
>>>>>
>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>> `Filter(identity_filter)`.
>>>>>>
>>>>>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
>>>>>> wrote:
>>>>>>
>>>>>>> You don't have to specify the names if the callable you pass in is
>>>>>>> /different/ for two `beam.Map`s, but  if the callable is the same you 
>>>>>>> must
>>>>>>> specify a label. For example, the below will raise an exception:
>>>>>>>
>>>>>>> ```
>>>>>>> | beam.Filter(identity_filter)
>>>>>>> | beam.Filter(identity_filter)
>>>>>>> ```
>>>>>>>
>>>>>>> Here's an example on playground that shows the error message you get
>>>>>>> [1]. I marked every line I added with a "# ++".
>>>>>>>
>>>>>>> It's a contrived example, but using a map or filter at the same
>>>>>>> pipeline level probably comes up often, at least in my inexperience. For
>>>>>>> example, you. might have a pipeline that partitions a pcoll into three
>>>>>>> different pcolls, runs some transforms on them, and then runs the same 
>>>>>>> type
>>>>>>> of filter on each of them.
>>>>>>>
>>>>>>> The case that happens most often for me is using the `assert_that`
>>>>>>> [2] testing transform. In this case, I think often users will really 
>>>>>>> have
>>>>>>> no need for a disambiguating label as they're often just writing unit 
>>>>>>> tests
>>>>>>> that tes

Re: [VOTE] Release 2.51.0, release candidate #1

2023-10-04 Thread Robert Bradshaw via dev
+1 (binding)

Verified artifacts and signatures and tested a simple python pipeline in a
fresh environment with a wheel.

On Wed, Oct 4, 2023 at 8:05 AM Ritesh Ghorse via dev 
wrote:

> +1 (non-binding) validated Go SDK quickstart and Python Streaming
> quickstart on Dataflow runner.
>
> Thanks!
>
> On Tue, Oct 3, 2023 at 5:40 PM XQ Hu via dev  wrote:
>
>> +1 (non-binding). Tested the simple dataflow ML starter job with
>> https://github.com/google/dataflow-ml-starter/actions/runs/6397130175/job/17364408813
>> .
>>
>> On Tue, Oct 3, 2023 at 2:29 PM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> All Beam Python versions 2.50 and greater run exclusively on Dataflow
>>> runner v2, so we don't need to test v1 anymore. I'll delete those rows from
>>> the spreadsheet
>>>
>>> On Tue, Oct 3, 2023 at 2:25 PM Svetak Sundhar 
>>> wrote:
>>>
 +1 Non Binding

 Tested Python Direct Runner and Dataflow Runner as well.

 On the spreadsheet, I came across "Dataflow v1 (until 2.49.0,
 inclusive)", and do not fully understand what this means.

 Does this mean
 (1) we shouldn't be testing on Dataflow runner v1 for releases after
 2.49 or
 (2) make sure we test on runner v1 for this release?

 Thanks in advance for the clarification,



 Svetak Sundhar

   Data Engineer
 s vetaksund...@google.com



 On Tue, Oct 3, 2023 at 2:14 PM Danny McCormick via dev <
 dev@beam.apache.org> wrote:

> +1 (non-binding)
>
> Tested python/ML execution with
> https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_huggingface.ipynb
> (interactive runner) and
> https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/automatic_model_refresh.ipynb
> (Dataflow runner).
>
> Thanks,
> Danny
>
> On Tue, Oct 3, 2023 at 1:58 PM Kenneth Knowles 
> wrote:
>
>> Hi everyone,
>>
>> Please review and vote on the release candidate #1 for the version
>> 2.51.0, as follows:
>>
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1 if no issues are found. Only PMC member votes will
>> count towards the final vote, but votes from all community members is
>> encouraged and helpful for finding regressions; you can either test your
>> own use cases or use cases from the validation sheet [10].
>>
>> The complete staging area is available for your review, which
>> includes:
>>
>>- GitHub Release notes [1],
>>- the official Apache source release to be deployed to
>>dist.apache.org [2], which is signed with the key with
>>fingerprint  [3],
>>- all artifacts to be deployed to the Maven Central Repository
>>[4],
>>- source code tag "v1.2.3-RC3" [5],
>>- website pull request listing the release [6], the blog post
>>[6], and publishing the API reference manual [7].
>>- Java artifacts were built with Gradle GRADLE_VERSION and
>>OpenJDK/Oracle JDK JDK_VERSION.
>>- Python artifacts are deployed along with the source release to
>>the dist.apache.org [2] and PyPI[8].
>>- Go artifacts and documentation are available at pkg.go.dev [9]
>>- Validation sheet with a tab for 1.2.3 release to help with
>>validation [10].
>>- Docker images published to Docker Hub [11].
>>- PR to run tests against release branch [12].
>>
>> The vote will be open for at least 72 hours. It is adopted by
>> majority approval, with at least 3 PMC affirmative votes.
>>
>> For guidelines on how to try the release in your projects, check out
>> our blog post at https://beam.apache.org/blog/validate-beam-release/.
>>
>> Thanks,
>> Kenn
>>
>> [1] https://github.com/apache/beam/milestone/15
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.51.0
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1356/
>> [5] https://github.com/apache/beam/tree/v2.51.0-RC1
>> [6] https://github.com/apache/beam/pull/28800
>> [7] https://github.com/apache/beam-site/pull/649
>> [8] https://pypi.org/project/apache-beam/2.51.0rc1/
>> [9]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.51.0-RC1/go/pkg/beam
>> [10]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=437054928
>> [11] https://hub.docker.com/search?q=apache%2Fbeam=image
>> [12] https://github.com/apache/beam/pull/28663
>>
>


Re: [QUESTION] Why no auto labels?

2023-10-04 Thread Robert Bradshaw via dev
BeamJava and BeamPython have the exact same behavior: transform names
within must be distinct [1]. This is because we do not necessarily know at
pipeline construction time if the pipeline will be streaming or batch, or
if it will be updated in the future, so the decision was made to impose
this restriction up front. Both will auto-generate a name for you if one is
not given, but will do so deterministically (not depending on some global
context) to avoid potential update problems.

[1] Note that this applies to the fully qualified transform name, so the
naming only has to be distinct within a composite transform (or at the top
level--the pipeline itself is isomorphic to a single composite transform).

On Wed, Oct 4, 2023 at 3:43 AM Joey Tran  wrote:

> Cross posting this thread to dev@ to see if this is intentional behavior
> or if it's something worth changing for the python SDK
>
> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user  wrote:
>
>> That suggests the default label is created as that, which indeed causes
>> the duplication error.
>>
>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>> wrote:
>>
>>> Not sure what that suggests
>>>
>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>> wrote:
>>>
 Looks like this is the current behaviour. If you have `t =
 beam.Filter(identity_filter)`, `t.label` is defined as
 `Filter(identity_filter)`.

 On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
 wrote:

> You don't have to specify the names if the callable you pass in is
> /different/ for two `beam.Map`s, but  if the callable is the same you must
> specify a label. For example, the below will raise an exception:
>
> ```
> | beam.Filter(identity_filter)
> | beam.Filter(identity_filter)
> ```
>
> Here's an example on playground that shows the error message you get
> [1]. I marked every line I added with a "# ++".
>
> It's a contrived example, but using a map or filter at the same
> pipeline level probably comes up often, at least in my inexperience. For
> example, you. might have a pipeline that partitions a pcoll into three
> different pcolls, runs some transforms on them, and then runs the same 
> type
> of filter on each of them.
>
> The case that happens most often for me is using the `assert_that` [2]
> testing transform. In this case, I think often users will really have no
> need for a disambiguating label as they're often just writing unit tests
> that test a few different properties of their workflow.
>
> [1] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
> [2]
> https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.testing.util.html#apache_beam.testing.util.assert_that
>
> On Mon, Oct 2, 2023 at 9:08 AM Bruno Volpato via user <
> u...@beam.apache.org> wrote:
>
>> If I understand the question correctly, you don't have to specify
>> those names.
>>
>> As Reuven pointed out, it is probably a good idea so you have a
>> stable / deterministic graph.
>> But in the Python SDK, you can simply use pcollection | map_fn,
>> instead of pcollection | 'Map' >> map_fn.
>>
>> See an example here
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116
>>
>>
>> On Sun, Oct 1, 2023 at 9:08 PM Joey Tran 
>> wrote:
>>
>>> Hmm, I'm not sure what you mean by "updating pipelines in place".
>>> Can you elaborate?
>>>
>>> I forgot to mention my question is posed from the context of a
>>> python SDK user, and afaict, there doesn't seem to be an obvious way to
>>> autogenerate names/labels. Hearing that the java SDK supports it makes 
>>> me
>>> wonder if the python SDK could support it as well though... (If so, I'd 
>>> be
>>> happy to do implement it). Currently, it's fairly tedious to have to 
>>> name
>>> every instance of a transform that you might reuse in a pipeline, e.g. 
>>> when
>>> reapplying the same Map on different pcollections.
>>>
>>> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user <
>>> u...@beam.apache.org> wrote:
>>>
 Are you talking about transform names? The main reason was because
 for runners that support updating pipelines in place, the only way to 
 do so
 safely is if the runner can perfectly identify which transforms in the 
 new
 graph match the ones in the old graph. There's no good way to auto 
 generate
 names that will stay stable across updates - even small changes to the
 pipeline might change the order of nodes in the graph, which could 
 result
 in a corrupted update.

 However, if you don't care about update, Beam can auto generate
 these names for you! When you call PCollection.apply (if using 
 BeamJava),
 

Re: Runner Bundling Strategies

2023-09-27 Thread Robert Bradshaw via dev
gt;>>> 1. Yes, a watermark can update in the middle of a bundle.
>>>>>> 2. The records in the bundle themselves will prevent the watermark
>>>>>> from updating as they are still in flight until after finish bundle.
>>>>>> Therefore simply caching the records should always be watermark safe,
>>>>>> regardless of the runner. You will only run into problems if you try and
>>>>>> move timestamps "backwards" - which is why Beam strongly discourages 
>>>>>> this.
>>>>>>
>>>>>> This is not aligned with  FlinkRunner's implementation. And I
>>>>>> actually think it is not aligned conceptually.  As mentioned, Flink does
>>>>>> not have the concept of bundles at all. It achieves fault tolerance via
>>>>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>>>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>>>>> implemented as a somewhat arbitrary set of elements between two 
>>>>>> consecutive
>>>>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>>>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) 
>>>>>> only
>>>>>> after the checkpoint barrier passes over the elements in the bundle 
>>>>>> (every
>>>>>> bundle is finished at the very latest exactly before a checkpoint). But
>>>>>> watermark propagation and bundle finalization is completely unrelated. 
>>>>>> This
>>>>>> might be a bug in the runner, but requiring checkpoint for watermark
>>>>>> propagation will introduce insane delays between processing time and
>>>>>> watermarks, every executable stage will delay watermark propagation 
>>>>>> until a
>>>>>> checkpoint (which is typically the order of seconds). This delay would 
>>>>>> add
>>>>>> up after each stage.
>>>>>>
>>>>>
>>>>> It's not bundles that hold up processing, rather it is elements, and
>>>>> elements are not considered "processed" until FinishBundle.
>>>>>
>>>>> You are right about Flink. In many cases this is fine - if Flink rolls
>>>>> back to the last checkpoint, the watermark will also roll back, and
>>>>> everything stays consistent. So in general, one does not need to wait for
>>>>> checkpoints for watermark propagation.
>>>>>
>>>>> Where things get a bit weirder with Flink is whenever one has external
>>>>> side effects. In theory, one should wait for checkpoints before letting a
>>>>> Sink flush, otherwise one could end up with incorrect outputs (especially
>>>>> with a sink like TextIO). Flink itself recognizes this, and that's why 
>>>>> they
>>>>> provide TwoPhaseCommitSinkFunction
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
>>>>>  which
>>>>> waits for a checkpoint. In Beam, this is the reason we introduced
>>>>> RequiresStableInput. Of course in practice many Flink users don't do this 
>>>>> -
>>>>> in which case they are prioritizing latency over data correctness.
>>>>>
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský 
>>>>>> wrote:
>>>>>>
>>>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>>>
>>>>>>> There was a thread [1], where the conclusion seemed to be that
>>>>>>> updating watermark is possible even in the middle of a bundle. Actually,
>>>>>>> handling watermarks is runner-dependent (e.g. Flink does not store
>>>>>>> watermarks in checkpoints, they are always recomputed from scratch on
>>>>>>> restore).
>>>>>>>
>>>>>>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>>>>>>
>>>>>>> On Fri, Sep 22, 202

Re: Runner Bundling Strategies

2023-09-26 Thread Robert Bradshaw via dev
gt;>> up after each stage.
>>>>
>>>
>>> It's not bundles that hold up processing, rather it is elements, and
>>> elements are not considered "processed" until FinishBundle.
>>>
>>> You are right about Flink. In many cases this is fine - if Flink rolls
>>> back to the last checkpoint, the watermark will also roll back, and
>>> everything stays consistent. So in general, one does not need to wait for
>>> checkpoints for watermark propagation.
>>>
>>> Where things get a bit weirder with Flink is whenever one has external
>>> side effects. In theory, one should wait for checkpoints before letting a
>>> Sink flush, otherwise one could end up with incorrect outputs (especially
>>> with a sink like TextIO). Flink itself recognizes this, and that's why they
>>> provide TwoPhaseCommitSinkFunction
>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
>>>  which
>>> waits for a checkpoint. In Beam, this is the reason we introduced
>>> RequiresStableInput. Of course in practice many Flink users don't do this -
>>> in which case they are prioritizing latency over data correctness.
>>>
>>>>
>>>> Reuven
>>>>
>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>>>>
>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>
>>>>> There was a thread [1], where the conclusion seemed to be that
>>>>> updating watermark is possible even in the middle of a bundle. Actually,
>>>>> handling watermarks is runner-dependent (e.g. Flink does not store
>>>>> watermarks in checkpoints, they are always recomputed from scratch on
>>>>> restore).
>>>>>
>>>>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>>>>
>>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>>>>>
>>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>>>>
>>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> I've actually wondered about this specifically for streaming... if
>>>>>>> you're writing a pipeline there it seems like you're often going to 
>>>>>>> want to
>>>>>>> put high fixed cost things like database connections even outside of the
>>>>>>> bundle setup. You really only want to do that once in the lifetime of 
>>>>>>> the
>>>>>>> worker itself, not the bundle. Seems like having that boundary be 
>>>>>>> somewhere
>>>>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>>>>> latency) group of elements might be more useful? I suppose this depends
>>>>>>> heavily on the object lifecycle in the sdk worker though.
>>>>>>>
>>>>>>
>>>>>> +1. This is the difference between @Setup and @StartBundle. The
>>>>>> start/finish bundle operations should be used for bracketing element
>>>>>> processing that must be committed as a unit for correct failure recovery
>>>>>> (e.g. if elements are cached in ProcessElement, they should all be 
>>>>>> emitted
>>>>>> in FinishBundle). On the other hand, things like open database 
>>>>>> connections
>>>>>> can and likely should be shared across bundles.
>>>>>>
>>>>>> This is correct, but the caching between @StartBundle and
>>>>>> @FinishBundle has some problems. First, users need to manually set
>>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might
>>>>>> overtake the buffered elements.
>>>>>>
>>>>>
>>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>
>>>>>
>>>>>> Users don't have other option than using timer.withOutputTimestamp
>>>>>> for that, as we don't have 

[YAML] Declarative beam (aka YAML) coordination

2023-09-25 Thread Robert Bradshaw via dev
Given the interest in the YAML work by multiple parties, we put together
https://s.apache.org/beam-yaml-contribute to more easily coordinate on this
effort. Nothing that surprising--we're going to continue using the standard
lists, github, etc.--but it should help for folks who want to get started.

Feel free to respond here on the list or comment on the doc if you have any
questions.


Re: Runner Bundling Strategies

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:

> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>
> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
> wrote:
>
>> I've actually wondered about this specifically for streaming... if you're
>> writing a pipeline there it seems like you're often going to want to put
>> high fixed cost things like database connections even outside of the bundle
>> setup. You really only want to do that once in the lifetime of the worker
>> itself, not the bundle. Seems like having that boundary be somewhere other
>> than an arbitrarily (and probably small in streaming to avoid latency)
>> group of elements might be more useful? I suppose this depends heavily on
>> the object lifecycle in the sdk worker though.
>>
>
> +1. This is the difference between @Setup and @StartBundle. The
> start/finish bundle operations should be used for bracketing element
> processing that must be committed as a unit for correct failure recovery
> (e.g. if elements are cached in ProcessElement, they should all be emitted
> in FinishBundle). On the other hand, things like open database connections
> can and likely should be shared across bundles.
>
> This is correct, but the caching between @StartBundle and @FinishBundle
> has some problems. First, users need to manually set watermark hold for
> min(timestamp in bundle), otherwise watermark might overtake the buffered
> elements.
>

Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be discarded.


> Users don't have other option than using timer.withOutputTimestamp for
> that, as we don't have a user-facing API to set watermark hold otherwise,
> thus the in-bundle caching implies stateful DoFn. The question might then
> by, why not use "classical" stateful caching involving state, as there is
> full control over the caching in user code. This triggered me an idea if it
> would be useful to add the information about caching to the API (e.g. in
> Java @StartBundle(caching=true)), which could solve the above issues maybe
> (runner would know to set the hold, it could work with "stateless" DoFns)?
>

Really, this is one of the areas that the streaming/batch abstraction
leaks. In batch it was a common pattern to have local DoFn instance state
that persisted from start to finish bundle, and these were also used as
convenient entry points for other operations (like opening
database connections) 'cause bundles were often "as large as possible."
WIth the advent of n streaming it makes sense to put this in
explicitly managed runner state to allow for cross-bundle amortization and
there's more value in distinguishing between @Setup and @StartBundle.

(Were I do to things over I'd probably encourage an API that discouraged
non-configuration instance state on DoFns altogether, e.g. in the notion of
Python context managers (and an equivalent API could probably be put
together with AutoClosables in Java) one would have something like

ParDo(X)

which would logically (though not necessarily physically) lead to an
execution like

with X.bundle_processor() as bundle_processor:
  for bundle in bundles:
with bundle_processor.element_processor() as process:
  for element in bundle:
process(element)

where the traditional setup/start_bundle/finish_bundle/teardown logic would
live in the __enter__ and __exit__ methods (made even easier with
coroutines.) For convenience one could of course provide a raw bundle
processor or element processor to ParDo if the enter/exit contexts are
trivial. But this is getting somewhat off-topic...


>
>>
>> Best,
>> B
>>
>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:
>>
>>> (I notice that you replied only to yourself, but there has been a whole
>>> thread of discussion on this - are you subscribed to dev@beam?
>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>
>>> It sounds like you want what everyone wants: to have the biggest bundles
>>> possible.
>>>
>>> So for bounded data, basically you make even splits of the data and each
>>> split is one bundle. And then dynamic splitting to redistribute work to
>>> eliminate stragglers, if your engine has that capability.
>>>
>>> For unbounded data, you more-or-less bundle as much as you can without
>>> waiting too long, like Jan described.
>>>
>>> Users know to put their high fixed costs in @StartBundle and then it is
>>> the runner's job to put as many calls to @ProcessElement as possible to
>>> amortize.
>>>
>>> Kenn
>>>
>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran 
>&g

Re: User-facing website vs. contributor-facing website

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 8:05 AM Danny McCormick via dev 
wrote:

> > I do feel strongly that https://beam.apache.org/contribute/ should
> remain on the main site, as it's aimed at users (who hopefully want to step
> up and contribute)
>
> To be clear, I don't think anyone is suggesting getting rid of the
> section, my comments were about replacing the side panel links with links
> to the wiki (or now markdown or wherever we put our docs) instead of
> hosting those things as part of our site.
>
> > Related, I stumbled across this the other day:
> https://github.com/apache/beam-site which appears to be unused which
> could probably even have different review and committer sets if we wanted?
>
> That actually holds our published release docs, just not on master -
> https://github.com/apache/beam-site/tree/release-docs.
>

Yeah. Basically we're using it as hosting for our voluminous auto-generated
docs.


> A separate repo is always an option regardless, though I don't see a ton
> of advantages and it moves us further from the core codebase.
>

I don't see any advantage, and plenty of downsides, to a separate repo.
What is the issue we're trying to solve here?


> > I feel like that's actually pretty easy with Github actions? I think
> maybe there's even one that exists Github Pages and probably any other
> static site generator thingy we could care to name.
>
> Do you know of any actions that do this?
> https://github.com/kamranahmedse/github-pages-blog-action is kinda close,
> but not obviously better than a folder of markdown docs (no side nav
> AFAIK). I'm not sure if actions are really helpful here anyways.
>
> Building our own is definitely doable, but maybe not trivial (feel free to
> fact check that) and does introduce a second website framework (hugo vs
> jekyll).
>

Yeah, -1 on introducing yet another framework. Mostly, we need to
prioritize a place to push content that's easy to keep up to date.


> On Fri, Sep 22, 2023 at 10:42 AM Byron Ellis via dev 
> wrote:
>
>> I feel like that's actually pretty easy with Github actions? I think
>> maybe there's even one that exists Github Pages and probably any other
>> static site generator thingy we could care to name. Related, I stumbled
>> across this the other day: https://github.com/apache/beam-site which
>> appears to be unused which could probably even have different review and
>> committer sets if we wanted?
>>
>> On Thu, Sep 21, 2023 at 3:19 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> TBH, I'm not a huge fan of the wikis either. My ideal flow would be
>>> something like g3doc, and markdown files in github do a reasonable enough
>>> job emulating that. (I don't think the overhead of having to do a PR for
>>> small edits like typos is oneros, as those are super easy reviews to do as
>>> well...) For anything in-depth, a pointer to an "actual" doc with better
>>> collaborative editing tools is generally in order anyway.
>>>
>>> I do feel strongly that https://beam.apache.org/contribute/ should
>>> remain on the main site, as it's aimed at users (who hopefully want to step
>>> up and contribute). The top level should probably mostly be a pointer to
>>> this, but I think it's valuable (for the audience that reaches it from
>>> github) to be a bit taylored to that audience (e.g. assume they just
>>> forked/downloaded the repository and want to edit-build-push. Generally a
>>> more advanced user than would find the page on the website.)
>>>
>>> The release guide? Meh. Wherever those doing releases find it most
>>> convenient. If that was me I'd probably put a markdown file right in the
>>> release directory next to the relevant scripts... (If not jump to literate
>>> programming right there :).
>>>
>>> On Thu, Sep 21, 2023 at 1:20 PM Kenneth Knowles  wrote:
>>>
>>>>
>>>>
>>>> On Thu, Sep 21, 2023 at 3:55 PM Danny McCormick <
>>>> dannymccorm...@google.com> wrote:
>>>>
>>>>>  > - reviewed
>>>>>
>>>>> Generally, I'm actually probably -0 on this one - it depends on
>>>>> context, but things that are for other developers only are usually better
>>>>> off without this requirement IMO since you get more contributions and more
>>>>> useful/unpolished things. Unfortunately, I'm not sure if confluence
>>>>> actually meets the bar for easy to update though because getting an
>>>>> account/initial setup is a pain. So I'm -0 since I don't know of a tool
>>>

Re: Runner Bundling Strategies

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
wrote:

> I've actually wondered about this specifically for streaming... if you're
> writing a pipeline there it seems like you're often going to want to put
> high fixed cost things like database connections even outside of the bundle
> setup. You really only want to do that once in the lifetime of the worker
> itself, not the bundle. Seems like having that boundary be somewhere other
> than an arbitrarily (and probably small in streaming to avoid latency)
> group of elements might be more useful? I suppose this depends heavily on
> the object lifecycle in the sdk worker though.
>

+1. This is the difference between @Setup and @StartBundle. The
start/finish bundle operations should be used for bracketing element
processing that must be committed as a unit for correct failure recovery
(e.g. if elements are cached in ProcessElement, they should all be emitted
in FinishBundle). On the other hand, things like open database connections
can and likely should be shared across bundles.


>
> Best,
> B
>
> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:
>
>> (I notice that you replied only to yourself, but there has been a whole
>> thread of discussion on this - are you subscribed to dev@beam?
>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>
>> It sounds like you want what everyone wants: to have the biggest bundles
>> possible.
>>
>> So for bounded data, basically you make even splits of the data and each
>> split is one bundle. And then dynamic splitting to redistribute work to
>> eliminate stragglers, if your engine has that capability.
>>
>> For unbounded data, you more-or-less bundle as much as you can without
>> waiting too long, like Jan described.
>>
>> Users know to put their high fixed costs in @StartBundle and then it is
>> the runner's job to put as many calls to @ProcessElement as possible to
>> amortize.
>>
>> Kenn
>>
>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran 
>> wrote:
>>
>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>> greatest strategy for high *fixed* cost transforms", e.g. a transform
>>> that takes 5 minutes to get set up and then maybe a microsecond per input
>>>
>>> I suppose one solution is to move the responsibility for handling this
>>> kind of situation to the user and expect users to use a bundling transform
>>> (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what
>>> other runners expect? Just want to make sure I'm not missing some smart
>>> generic bundling strategy that might handle this for users.
>>>
>>> [1]
>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>
>>>
>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran 
>>> wrote:
>>>
 Writing a runner and the first strategy for determining bundling size
 was to just start with a bundle size of one and double it until we reach a
 size that we expect to take some targets per-bundle runtime (e.g. maybe 10
 minutes). I realize that this isn't the greatest strategy for high sized
 cost transforms. I'm curious what kind of strategies other runners take?

>>>


Re: Runner Bundling Strategies

2023-09-21 Thread Robert Bradshaw via dev
Dataflow uses a work-stealing protocol. The FnAPI has a protocol to ask the
worker to stop at a certain element that has already been sent.

On Thu, Sep 21, 2023 at 4:24 PM Joey Tran  wrote:

> Writing a runner and the first strategy for determining bundling size was
> to just start with a bundle size of one and double it until we reach a size
> that we expect to take some targets per-bundle runtime (e.g. maybe 10
> minutes). I realize that this isn't the greatest strategy for high sized
> cost transforms. I'm curious what kind of strategies other runners take?
>


Re: User-facing website vs. contributor-facing website

2023-09-21 Thread Robert Bradshaw via dev
TBH, I'm not a huge fan of the wikis either. My ideal flow would be
something like g3doc, and markdown files in github do a reasonable enough
job emulating that. (I don't think the overhead of having to do a PR for
small edits like typos is oneros, as those are super easy reviews to do as
well...) For anything in-depth, a pointer to an "actual" doc with better
collaborative editing tools is generally in order anyway.

I do feel strongly that https://beam.apache.org/contribute/ should remain
on the main site, as it's aimed at users (who hopefully want to step up and
contribute). The top level should probably mostly be a pointer to this, but
I think it's valuable (for the audience that reaches it from github) to be
a bit taylored to that audience (e.g. assume they just forked/downloaded
the repository and want to edit-build-push. Generally a more advanced user
than would find the page on the website.)

The release guide? Meh. Wherever those doing releases find it most
convenient. If that was me I'd probably put a markdown file right in the
release directory next to the relevant scripts... (If not jump to literate
programming right there :).

On Thu, Sep 21, 2023 at 1:20 PM Kenneth Knowles  wrote:

>
>
> On Thu, Sep 21, 2023 at 3:55 PM Danny McCormick 
> wrote:
>
>>  > - reviewed
>>
>> Generally, I'm actually probably -0 on this one - it depends on context,
>> but things that are for other developers only are usually better off
>> without this requirement IMO since you get more contributions and more
>> useful/unpolished things. Unfortunately, I'm not sure if confluence
>> actually meets the bar for easy to update though because getting an
>> account/initial setup is a pain. So I'm -0 since I don't know of a tool
>> that both allows people to easily edit and avoids spam, but if such a tool
>> exists I'd strongly prefer that.
>>
>> >  - discoverable/orientable aka top/side nav
>>
>> I'm -1 on this requirement. A structured in-repo `docs` folder and/or a
>> dedicated developer documentation repo have worked well on teams I've been
>> on in the past and it avoids having to maintain additional infrastructure
>> for a website. It also brings folks closer to the code, making edits more
>> likely. These look nice, but I don't know how much value they actually add.
>>
>> > I did a quick search to see if there was a standard answer to having
>> top and side nav for a docs/ folder of markdown in your github repo. I
>> guess that is GitHub Pages? TBH I have used them happily in the distant
>> past but somehow I thought they had been deprecated or something.
>>
>> I'm probably -1 on pages because at that point we've got 2 different
>> website setups, one using hugo (https://beam.apache.org/) and one using
>> Jekyl (pages); at that point, we might as well just move things totally
>> back into the website and just have it live under a separate section of the
>> site.
>>
>> My vote if we're moving away from confluence (which seems fine) would be
>> either a dedicated `docs` or `developer-docs` folder or a separate markdown
>> only repo.
>>
>
> I could go for this. I'm pretty -1 on a soup of files without any
> information architecture or scattered throughout random folders. But I'm
> probably -2 on the confluence wiki if such a thing is possible and it would
> also remove a piece from our infra, so... I think I'd call it an upgrade to
> have a folder full of docs. If we then make taxonomic subfolders that hide
> all the information I'll be sad again.
>
> Ideally the developer-docs/ folder could be read as text, lightly rendered
> like GH does, or fully rendered with navs. Yes, I am describing g3doc
> (which is talked about publicly so I can name it, but I don't know what
> the publicly-available equivalent is). None of the
> website-building not-human-readable stuff from jekyll and hugo.
>
> Kenn
>
>
>>
>> On Thu, Sep 21, 2023 at 3:30 PM Kenneth Knowles  wrote:
>>
>>> OK so this did turn into a discussion all about the tech/hosting :-). It
>>> has been 5 years and we have experience of the wiki now so maybe that is
>>> fair anyhow. And perhaps the preference of where to put information cannot
>>> be separated from it.
>>>
>>> Top posting because there was so much in common across the responses and
>>> I agree mostly too so I'll merge & paraphrase.
>>>
>>> > Focusing the main website primarily toward users is good
>>>
>>> Seems everyone still agrees with this
>>>
>>> > The wiki is not reviewed and our docs we care about should be
>>>
>>> Agree.
>>>
>>> > Wiki syntax is an old thing that is not quite markdown and should just
>>> be markdown
>>>
>>> Agree.
>>>
>>> > Wiki is yet another place to go, hard to navigate, not discoverable.
>>>
>>> Agree.
>>>
>>> So the "neverending argument" is so far unanimous on this particular
>>> thread :-)
>>>
>>> ---
>>>
>>> My personal preferences are:
>>>
>>>  - markdown
>>>  - reviewed
>>>  - organized...
>>>  - ...independently of code folders
>>>  - discoverable/orientable 

  1   2   3   4   5   6   7   8   9   10   >