On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com>
wrote:

> I would like to figure out a way to get the stream-y interface to work, as
> I think it's more natural overall.
>
> One hypothesis is that if any elements are carried over loop iterations,
> there will likely be some that are carried over beyond the loop (after all
> the callee doesn't know when the loop is supposed to end). We could reject
> "plain" elements that are emitted after this point, requiring one to emit
> timestamp-windowed-values.
>

Are you assuming that the same stream (or overlapping sets of data) are
pushed to multiple workers ? I thought that the set of data streamed here
are the data that belong to the current bundle (hence already assigned to
the current worker) so any output from the current bundle invocation would
be a valid output of that bundle.


> Related to this, we could enforce that the only (user-accessible) way to
> get such a timestamped value is to start with one, e.g. a
> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same
> metadata but a new value. Thus a user wanting to do anything "fancy" would
> have to explicitly request iteration over these windowed values rather than
> over the raw elements. (This is also forward compatible with expanding the
> metadata that can get attached, e.g. pane infos, and makes the right thing
> the easiest/most natural.)
>
> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com>
> wrote:
>
>> Ah, that is a good point—being element-wise would make managing windows
>> and time stamps easier for the user. Fortunately it’s a fairly easy change
>> to make and maybe even less typing for the user. I was originally thinking
>> side inputs and metrics would happen outside the loop, but I think you want
>> a class and not a closure at that point for sanity.
>>
>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> Ah, I see.
>>>
>>> Yeah, I've thought about using an iterable for the whole bundle rather
>>> than start/finish bundle callbacks, but one of the questions is how that
>>> would impact implicit passing of the timestamp (and other) metadata from
>>> input elements to output elements. (You can of course attach the metadata
>>> to any output that happens in the loop body, but it's very easy to
>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering or
>>> otherwise modifying local state) and this would be hard to detect. (I
>>> suppose trying to output after the loop finishes could require
>>> something more explicit).
>>>
>>>
>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com>
>>> wrote:
>>>
>>>> Oh, I also forgot to mention that I included element-wise collection
>>>> operations like "map" that eliminate the need for pardo in many cases. the
>>>> groupBy command is actually a map + groupByKey under the hood. That was to
>>>> be more consistent with Swift's collection protocol (and is also why
>>>> PCollection and PCollectionStream are different types... PCollection
>>>> implements map and friends as pipeline construction operations whereas
>>>> PCollectionStream is an actual stream)
>>>>
>>>> I just happened to push some "IO primitives" that uses map rather than
>>>> pardo in a couple of places to do a true wordcount using good ol'
>>>> Shakespeare and very very primitive GCS IO.
>>>>
>>>> Best,
>>>> B
>>>>
>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com>
>>>> wrote:
>>>>
>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a bit
>>>>> before settling on where I ended up. Ultimately I decided to go with
>>>>> something that felt more Swift-y than anything else which means that 
>>>>> rather
>>>>> than dealing with a single element like you do in the other SDKs you're
>>>>> dealing with a stream of elements (which of course will often be of size
>>>>> 1). That's a really natural paradigm in the Swift world especially with 
>>>>> the
>>>>> async / await structures. So when you see something like:
>>>>>
>>>>> pardo(name:"Read Files") { filenames,output,errors in
>>>>>
>>>>> for try await (filename,_,_) in filenames {
>>>>>   ...
>>>>>   output.emit(data)
>>>>>
>>>>> }
>>>>>
>>>>> filenames is the input stream and then output and errors are both
>>>>> output streams. In theory you can have as many output streams as you like
>>>>> though at the moment there's a compiler bug in the new type pack feature
>>>>> that limits it to "as many as I felt like supporting". Presumably this 
>>>>> will
>>>>> get fixed before the official 5.9 release which will probably be in the
>>>>> October timeframe if history is any guide)
>>>>>
>>>>> If you had parameterization you wanted to send that would look like
>>>>> pardo("Parameter") { param,filenames,output,error in ... } where "param"
>>>>> would take on the value of "Parameter." All of this is being typechecked 
>>>>> at
>>>>> compile time BTW.
>>>>>
>>>>>
>>>>> the (filename,_,_) is a tuple spreading construct like you have in ES6
>>>>> and other things where "_" is Swift for "ignore." In this case
>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so you 
>>>>> can
>>>>> optionally extract the timestamp and the window if you want to manipulate
>>>>> it somehow.
>>>>>
>>>>> That said it would also be natural to provide elementwise pardos---
>>>>> that would probably mean having explicit type signatures in the closure. I
>>>>> had that at one point, but it felt less natural the more I used it. I'm
>>>>> also slowly working towards adding a more "traditional" DoFn 
>>>>> implementation
>>>>> approach where you implement the DoFn as an object type. In that case it
>>>>> would be very very easy to support both by having a default stream
>>>>> implementation call the equivalent of processElement. To make that
>>>>> performant I need to implement an @DoFn macro and I just haven't gotten to
>>>>> it yet.
>>>>>
>>>>> It's a bit more work and I've been prioritizing implementing composite
>>>>> and external transforms for the reasons you suggest. :-) I've got the
>>>>> basics of a composite transform (there's an equivalent wordcount example)
>>>>> and am hooking it into the pipeline generation, which should also give me
>>>>> everything I need to successfully hook in external transforms as well. 
>>>>> That
>>>>> will give me the jump on IOs as you say. I can also treat the pipeline
>>>>> itself as a composite transform which lets me get rid of the Pipeline {
>>>>> pipeline in ... } and just instead have things attach themselves to the
>>>>> pipeline implicitly.
>>>>>
>>>>> That said, there are some interesting IO possibilities that would be
>>>>> Swift native. In particularly, I've been looking at the native Swift
>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not
>>>>> distributed in the same was as, say, Beam SQL... but it would allow for 
>>>>> SQL
>>>>> statements on individual files with projection pushdown supported for
>>>>> things like Parquet which could have some cool and performant data lake
>>>>> applications. I'll probably do a couple of the simpler IOs as
>>>>> well---there's a Swift AWS SDK binding that's pretty good that would give
>>>>> me S3 and there's a Cloud auth library as well that makes it pretty easy 
>>>>> to
>>>>> work with GCS.
>>>>>
>>>>> In any case, I'm updating the branch as I find a minute here and
>>>>> there.
>>>>>
>>>>> Best,
>>>>> B
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Neat.
>>>>>>
>>>>>> Nothing like writing and SDK to actually understand how the FnAPI
>>>>>> works :). I like the use of groupBy. I have to admit I'm a bit mystified 
>>>>>> by
>>>>>> the syntax for parDo (I don't know swift at all which is probably 
>>>>>> tripping
>>>>>> me up). The addition of external (cross-language) transforms could let 
>>>>>> you
>>>>>> steal everything (e.g. IOs) pretty quickly from other SDKs.
>>>>>>
>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> For everyone who is interested, here's the draft PR:
>>>>>>>
>>>>>>> https://github.com/apache/beam/pull/28062
>>>>>>>
>>>>>>> I haven't had a chance to test it on my M1 machine yet though
>>>>>>> (there's a good chance there are a few places that need to properly 
>>>>>>> address
>>>>>>> endianness. Specifically timestamps in windowed values and length in
>>>>>>> iterable coders as those both use specifically bigendian 
>>>>>>> representations)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis <byronel...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Cham,
>>>>>>>>
>>>>>>>> Definitely happy to open a draft PR so folks can comment---there's
>>>>>>>> not as much code as it looks like since most of the LOC is just 
>>>>>>>> generated
>>>>>>>> protobuf. As for the support, I definitely want to add external 
>>>>>>>> transforms
>>>>>>>> and may actually add that support before adding the ability to make
>>>>>>>> composites in the language itself. With the way the SDK is laid out 
>>>>>>>> adding
>>>>>>>> composites to the pipeline graph is a separate operation than defining 
>>>>>>>> a
>>>>>>>> composite.
>>>>>>>>
>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is interest in
>>>>>>>>> Swift SDK from folks currently subscribed to the +user
>>>>>>>>> <user@beam.apache.org> list.
>>>>>>>>>
>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev <
>>>>>>>>> d...@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hello everyone,
>>>>>>>>>>
>>>>>>>>>> A couple of months ago I decided that I wanted to really
>>>>>>>>>> understand how the Beam FnApi works and how it interacts with the 
>>>>>>>>>> Portable
>>>>>>>>>> Runner. For me at least that usually means I need to write some code 
>>>>>>>>>> so I
>>>>>>>>>> can see things happening in a debugger and to really prove to myself 
>>>>>>>>>> I
>>>>>>>>>> understood what was going on I decided I couldn't use an existing SDK
>>>>>>>>>> language to do it since there would be the temptation to read some 
>>>>>>>>>> code and
>>>>>>>>>> convince myself that I actually understood what was going on.
>>>>>>>>>>
>>>>>>>>>> One thing led to another and it turns out that to get a minimal
>>>>>>>>>> FnApi integration going you end up writing a fair bit of an SDK. So I
>>>>>>>>>> decided to take things to a point where I had an SDK that could 
>>>>>>>>>> execute a
>>>>>>>>>> word count example via a portable runner backend. I've now reached 
>>>>>>>>>> that
>>>>>>>>>> point and would like to submit my prototype SDK to the list for 
>>>>>>>>>> feedback.
>>>>>>>>>>
>>>>>>>>>> It's currently living in a branch on my fork here:
>>>>>>>>>>
>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift
>>>>>>>>>>
>>>>>>>>>> At the moment it runs via the most recent XCode Beta using Swift
>>>>>>>>>> 5.9 on Intel Macs, but should also work using beta builds of 5.9 for 
>>>>>>>>>> Linux
>>>>>>>>>> running on Intel hardware. I haven't had a chance to try it on ARM 
>>>>>>>>>> hardware
>>>>>>>>>> and make sure all of the endian checks are complete. The
>>>>>>>>>> "IntegrationTests.swift" file contains a word count example that 
>>>>>>>>>> reads some
>>>>>>>>>> local files (as well as a missing file to exercise DLQ 
>>>>>>>>>> functionality) and
>>>>>>>>>> output counts through two separate group by operations to get it 
>>>>>>>>>> past the
>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the Python 
>>>>>>>>>> Portable
>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct Runner 
>>>>>>>>>> at this
>>>>>>>>>> time.
>>>>>>>>>>
>>>>>>>>>> I've shown it to a couple of folks already and incorporated some
>>>>>>>>>> of that feedback already (for example pardo was originally called 
>>>>>>>>>> dofn when
>>>>>>>>>> defining pipelines). In general I've tried to make the API as 
>>>>>>>>>> "Swift-y" as
>>>>>>>>>> possible, hence the heavy reliance on closures and while there 
>>>>>>>>>> aren't yet
>>>>>>>>>> composite PTransforms there's the beginnings of what would be needed 
>>>>>>>>>> for a
>>>>>>>>>> SwiftUI-like declarative API for creating them.
>>>>>>>>>>
>>>>>>>>>> There are of course a ton of missing bits still to be
>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, etc.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This should be fine and we can get the code documented without
>>>>>>>>> these features. I think support for composites and adding an external
>>>>>>>>> transform (see, Java
>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>,
>>>>>>>>> Python
>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>,
>>>>>>>>> Go
>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>,
>>>>>>>>> TypeScript
>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>)
>>>>>>>>> to add support for multi-lang will bring in a lot of features (for 
>>>>>>>>> example,
>>>>>>>>> I/O connectors) for free.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if folks
>>>>>>>>>> are interested, though the "Swift Way" would be to have it in its 
>>>>>>>>>> own repo
>>>>>>>>>> so that it can easily be used from the Swift Package Manager.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also it'll be
>>>>>>>>> easier to comment on a PR :)
>>>>>>>>>
>>>>>>>>> - Cham
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> [2]
>>>>>>>>> [3]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> B
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Reply via email to