This is great that is coming together, and am glad for the messages along
the way to understand process, choices, ...!



On Fri, Aug 25, 2023, 2:04 PM Byron Ellis via user <user@beam.apache.org>
wrote:

> Okay, after a brief detour through "get this working in the Flink Portable
> Runner" I think I have something pretty workable.
>
> PInput and POutput can actually be structs rather than protocols, which
> simplifies things quite a bit. It also allows us to use them with property
> wrappers for a SwiftUI-like experience if we want when defining DoFns
> (which is what I was originally intending to use them for). That also means
> the function signature you use for closures would match full-fledged DoFn
> definitions for the most part which is satisfying.
>
>
>
> On Thu, Aug 24, 2023 at 5:55 PM Byron Ellis <byronel...@google.com> wrote:
>
>> Okay, I tried a couple of different things.
>>
>> Implicitly passing the timestamp and window during iteration did not go
>> well. While physically possible it introduces an invisible side effect into
>> loop iteration which confused me when I tried to use it and I implemented
>> it. Also, I'm pretty sure there'd end up being some sort of race condition
>> nightmare continuing down that path.
>>
>> What I decided to do instead was the following:
>>
>> 1. Rename the existing "pardo" functions to "pstream" and require that
>> they always emit a window and timestamp along with their value. This
>> eliminates the side effect but lets us keep iteration in a bundle where
>> that might be convenient. For example, in my cheesy GCS implementation it
>> means that I can keep an OAuth token around for the lifetime of the bundle
>> as a local variable, which is convenient. It's a bit more typing for users
>> of pstream, but the expectation here is that if you're using pstream
>> functions You Know What You Are Doing and most people won't be using it
>> directly.
>>
>> 2. Introduce a new set of pardo functions (I didn't do all of them yet,
>> but enough to test the functionality and decide I liked it) which take a
>> function signature of (any PInput<InputType>,any POutput<OutputType>).
>> PInput takes the (InputType,Date,Window) tuple and converts it into a
>> struct with friendlier names. Not strictly necessary, but makes the code
>> nicer to read I think. POutput introduces emit functions that optionally
>> allow you to specify a timestamp and a window. If you don't for either one
>> it will take the timestamp and/or window of the input.
>>
>> Trying to use that was pretty pleasant to use so I think we should
>> continue down that path. If you'd like to see it in use, I reimplemented
>> map() and flatMap() in terms of this new pardo functionality.
>>
>> Code has been pushed to the branch/PR if you're interested in taking a
>> look.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Aug 24, 2023 at 2:15 PM Byron Ellis <byronel...@google.com>
>> wrote:
>>
>>> Gotcha, I think there's a fairly easy solution to link input and output
>>> streams.... Let me try it out... might even be possible to have both
>>> element and stream-wise closure pardos. Definitely possible to have that at
>>> the DoFn level (called SerializableFn in the SDK because I want to
>>> use @DoFn as a macro)
>>>
>>> On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I would like to figure out a way to get the stream-y interface to
>>>>>> work, as I think it's more natural overall.
>>>>>>
>>>>>> One hypothesis is that if any elements are carried over loop
>>>>>> iterations, there will likely be some that are carried over beyond the 
>>>>>> loop
>>>>>> (after all the callee doesn't know when the loop is supposed to end). We
>>>>>> could reject "plain" elements that are emitted after this point, 
>>>>>> requiring
>>>>>> one to emit timestamp-windowed-values.
>>>>>>
>>>>>
>>>>> Are you assuming that the same stream (or overlapping sets of data)
>>>>> are pushed to multiple workers ? I thought that the set of data streamed
>>>>> here are the data that belong to the current bundle (hence already 
>>>>> assigned
>>>>> to the current worker) so any output from the current bundle invocation
>>>>> would be a valid output of that bundle.
>>>>>
>>>>>>
>>>> Yes, the content of the stream is exactly the contents of the bundle.
>>>> The question is how to do the input_element:output_element correlation for
>>>> automatically propagating metadata.
>>>>
>>>>
>>>>> Related to this, we could enforce that the only (user-accessible) way
>>>>>> to get such a timestamped value is to start with one, e.g. a
>>>>>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same
>>>>>> metadata but a new value. Thus a user wanting to do anything "fancy" 
>>>>>> would
>>>>>> have to explicitly request iteration over these windowed values rather 
>>>>>> than
>>>>>> over the raw elements. (This is also forward compatible with expanding 
>>>>>> the
>>>>>> metadata that can get attached, e.g. pane infos, and makes the right 
>>>>>> thing
>>>>>> the easiest/most natural.)
>>>>>>
>>>>>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ah, that is a good point—being element-wise would make managing
>>>>>>> windows and time stamps easier for the user. Fortunately it’s a fairly 
>>>>>>> easy
>>>>>>> change to make and maybe even less typing for the user. I was originally
>>>>>>> thinking side inputs and metrics would happen outside the loop, but I 
>>>>>>> think
>>>>>>> you want a class and not a closure at that point for sanity.
>>>>>>>
>>>>>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <
>>>>>>> rober...@google.com> wrote:
>>>>>>>
>>>>>>>> Ah, I see.
>>>>>>>>
>>>>>>>> Yeah, I've thought about using an iterable for the whole bundle
>>>>>>>> rather than start/finish bundle callbacks, but one of the questions is 
>>>>>>>> how
>>>>>>>> that would impact implicit passing of the timestamp (and other) 
>>>>>>>> metadata
>>>>>>>> from input elements to output elements. (You can of course attach the
>>>>>>>> metadata to any output that happens in the loop body, but it's very 
>>>>>>>> easy to
>>>>>>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering 
>>>>>>>> or
>>>>>>>> otherwise modifying local state) and this would be hard to detect. (I
>>>>>>>> suppose trying to output after the loop finishes could require
>>>>>>>> something more explicit).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Oh, I also forgot to mention that I included element-wise
>>>>>>>>> collection operations like "map" that eliminate the need for pardo in 
>>>>>>>>> many
>>>>>>>>> cases. the groupBy command is actually a map + groupByKey under the 
>>>>>>>>> hood.
>>>>>>>>> That was to be more consistent with Swift's collection protocol (and 
>>>>>>>>> is
>>>>>>>>> also why PCollection and PCollectionStream are different types...
>>>>>>>>> PCollection implements map and friends as pipeline construction 
>>>>>>>>> operations
>>>>>>>>> whereas PCollectionStream is an actual stream)
>>>>>>>>>
>>>>>>>>> I just happened to push some "IO primitives" that uses map rather
>>>>>>>>> than pardo in a couple of places to do a true wordcount using good ol'
>>>>>>>>> Shakespeare and very very primitive GCS IO.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> B
>>>>>>>>>
>>>>>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite
>>>>>>>>>> a bit before settling on where I ended up. Ultimately I decided to 
>>>>>>>>>> go with
>>>>>>>>>> something that felt more Swift-y than anything else which means that 
>>>>>>>>>> rather
>>>>>>>>>> than dealing with a single element like you do in the other SDKs 
>>>>>>>>>> you're
>>>>>>>>>> dealing with a stream of elements (which of course will often be of 
>>>>>>>>>> size
>>>>>>>>>> 1). That's a really natural paradigm in the Swift world especially 
>>>>>>>>>> with the
>>>>>>>>>> async / await structures. So when you see something like:
>>>>>>>>>>
>>>>>>>>>> pardo(name:"Read Files") { filenames,output,errors in
>>>>>>>>>>
>>>>>>>>>> for try await (filename,_,_) in filenames {
>>>>>>>>>>   ...
>>>>>>>>>>   output.emit(data)
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> filenames is the input stream and then output and errors are both
>>>>>>>>>> output streams. In theory you can have as many output streams as you 
>>>>>>>>>> like
>>>>>>>>>> though at the moment there's a compiler bug in the new type pack 
>>>>>>>>>> feature
>>>>>>>>>> that limits it to "as many as I felt like supporting". Presumably 
>>>>>>>>>> this will
>>>>>>>>>> get fixed before the official 5.9 release which will probably be in 
>>>>>>>>>> the
>>>>>>>>>> October timeframe if history is any guide)
>>>>>>>>>>
>>>>>>>>>> If you had parameterization you wanted to send that would look
>>>>>>>>>> like pardo("Parameter") { param,filenames,output,error in ... } where
>>>>>>>>>> "param" would take on the value of "Parameter." All of this is being
>>>>>>>>>> typechecked at compile time BTW.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> the (filename,_,_) is a tuple spreading construct like you have
>>>>>>>>>> in ES6 and other things where "_" is Swift for "ignore." In this case
>>>>>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so 
>>>>>>>>>> you can
>>>>>>>>>> optionally extract the timestamp and the window if you want to 
>>>>>>>>>> manipulate
>>>>>>>>>> it somehow.
>>>>>>>>>>
>>>>>>>>>> That said it would also be natural to provide elementwise
>>>>>>>>>> pardos--- that would probably mean having explicit type signatures 
>>>>>>>>>> in the
>>>>>>>>>> closure. I had that at one point, but it felt less natural the more 
>>>>>>>>>> I used
>>>>>>>>>> it. I'm also slowly working towards adding a more "traditional" DoFn
>>>>>>>>>> implementation approach where you implement the DoFn as an object 
>>>>>>>>>> type. In
>>>>>>>>>> that case it would be very very easy to support both by having a 
>>>>>>>>>> default
>>>>>>>>>> stream implementation call the equivalent of processElement. To make 
>>>>>>>>>> that
>>>>>>>>>> performant I need to implement an @DoFn macro and I just haven't 
>>>>>>>>>> gotten to
>>>>>>>>>> it yet.
>>>>>>>>>>
>>>>>>>>>> It's a bit more work and I've been prioritizing implementing
>>>>>>>>>> composite and external transforms for the reasons you suggest. :-) 
>>>>>>>>>> I've got
>>>>>>>>>> the basics of a composite transform (there's an equivalent wordcount
>>>>>>>>>> example) and am hooking it into the pipeline generation, which 
>>>>>>>>>> should also
>>>>>>>>>> give me everything I need to successfully hook in external 
>>>>>>>>>> transforms as
>>>>>>>>>> well. That will give me the jump on IOs as you say. I can also treat 
>>>>>>>>>> the
>>>>>>>>>> pipeline itself as a composite transform which lets me get rid of the
>>>>>>>>>> Pipeline { pipeline in ... } and just instead have things attach 
>>>>>>>>>> themselves
>>>>>>>>>> to the pipeline implicitly.
>>>>>>>>>>
>>>>>>>>>> That said, there are some interesting IO possibilities that would
>>>>>>>>>> be Swift native. In particularly, I've been looking at the native 
>>>>>>>>>> Swift
>>>>>>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not
>>>>>>>>>> distributed in the same was as, say, Beam SQL... but it would allow 
>>>>>>>>>> for SQL
>>>>>>>>>> statements on individual files with projection pushdown supported for
>>>>>>>>>> things like Parquet which could have some cool and performant data 
>>>>>>>>>> lake
>>>>>>>>>> applications. I'll probably do a couple of the simpler IOs as
>>>>>>>>>> well---there's a Swift AWS SDK binding that's pretty good that would 
>>>>>>>>>> give
>>>>>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty 
>>>>>>>>>> easy to
>>>>>>>>>> work with GCS.
>>>>>>>>>>
>>>>>>>>>> In any case, I'm updating the branch as I find a minute here and
>>>>>>>>>> there.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> B
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Neat.
>>>>>>>>>>>
>>>>>>>>>>> Nothing like writing and SDK to actually understand how the
>>>>>>>>>>> FnAPI works :). I like the use of groupBy. I have to admit I'm a bit
>>>>>>>>>>> mystified by the syntax for parDo (I don't know swift at all which 
>>>>>>>>>>> is
>>>>>>>>>>> probably tripping me up). The addition of external (cross-language)
>>>>>>>>>>> transforms could let you steal everything (e.g. IOs) pretty quickly 
>>>>>>>>>>> from
>>>>>>>>>>> other SDKs.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user <
>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For everyone who is interested, here's the draft PR:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/pull/28062
>>>>>>>>>>>>
>>>>>>>>>>>> I haven't had a chance to test it on my M1 machine yet though
>>>>>>>>>>>> (there's a good chance there are a few places that need to 
>>>>>>>>>>>> properly address
>>>>>>>>>>>> endianness. Specifically timestamps in windowed values and length 
>>>>>>>>>>>> in
>>>>>>>>>>>> iterable coders as those both use specifically bigendian 
>>>>>>>>>>>> representations)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis <
>>>>>>>>>>>> byronel...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Cham,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Definitely happy to open a draft PR so folks can
>>>>>>>>>>>>> comment---there's not as much code as it looks like since most of 
>>>>>>>>>>>>> the LOC
>>>>>>>>>>>>> is just generated protobuf. As for the support, I definitely want 
>>>>>>>>>>>>> to add
>>>>>>>>>>>>> external transforms and may actually add that support before 
>>>>>>>>>>>>> adding the
>>>>>>>>>>>>> ability to make composites in the language itself. With the way 
>>>>>>>>>>>>> the SDK is
>>>>>>>>>>>>> laid out adding composites to the pipeline graph is a separate 
>>>>>>>>>>>>> operation
>>>>>>>>>>>>> than defining a composite.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath <
>>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is
>>>>>>>>>>>>>> interest in Swift SDK from folks currently subscribed to the
>>>>>>>>>>>>>> +user <user@beam.apache.org> list.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev <
>>>>>>>>>>>>>> d...@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> A couple of months ago I decided that I wanted to really
>>>>>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with 
>>>>>>>>>>>>>>> the Portable
>>>>>>>>>>>>>>> Runner. For me at least that usually means I need to write some 
>>>>>>>>>>>>>>> code so I
>>>>>>>>>>>>>>> can see things happening in a debugger and to really prove to 
>>>>>>>>>>>>>>> myself I
>>>>>>>>>>>>>>> understood what was going on I decided I couldn't use an 
>>>>>>>>>>>>>>> existing SDK
>>>>>>>>>>>>>>> language to do it since there would be the temptation to read 
>>>>>>>>>>>>>>> some code and
>>>>>>>>>>>>>>> convince myself that I actually understood what was going on.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One thing led to another and it turns out that to get a
>>>>>>>>>>>>>>> minimal FnApi integration going you end up writing a fair bit 
>>>>>>>>>>>>>>> of an SDK. So
>>>>>>>>>>>>>>> I decided to take things to a point where I had an SDK that 
>>>>>>>>>>>>>>> could execute a
>>>>>>>>>>>>>>> word count example via a portable runner backend. I've now 
>>>>>>>>>>>>>>> reached that
>>>>>>>>>>>>>>> point and would like to submit my prototype SDK to the list for 
>>>>>>>>>>>>>>> feedback.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's currently living in a branch on my fork here:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using
>>>>>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta builds 
>>>>>>>>>>>>>>> of 5.9 for
>>>>>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try 
>>>>>>>>>>>>>>> it on ARM
>>>>>>>>>>>>>>> hardware and make sure all of the endian checks are complete. 
>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example 
>>>>>>>>>>>>>>> that reads some
>>>>>>>>>>>>>>> local files (as well as a missing file to exercise DLQ 
>>>>>>>>>>>>>>> functionality) and
>>>>>>>>>>>>>>> output counts through two separate group by operations to get 
>>>>>>>>>>>>>>> it past the
>>>>>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the 
>>>>>>>>>>>>>>> Python Portable
>>>>>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct 
>>>>>>>>>>>>>>> Runner at this
>>>>>>>>>>>>>>> time.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I've shown it to a couple of folks already and incorporated
>>>>>>>>>>>>>>> some of that feedback already (for example pardo was originally 
>>>>>>>>>>>>>>> called dofn
>>>>>>>>>>>>>>> when defining pipelines). In general I've tried to make the API 
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures and 
>>>>>>>>>>>>>>> while there
>>>>>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of what 
>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There are of course a ton of missing bits still to be
>>>>>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, 
>>>>>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This should be fine and we can get the code documented
>>>>>>>>>>>>>> without these features. I think support for composites and 
>>>>>>>>>>>>>> adding an
>>>>>>>>>>>>>> external transform (see, Java
>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>,
>>>>>>>>>>>>>> Python
>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>,
>>>>>>>>>>>>>> Go
>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>,
>>>>>>>>>>>>>> TypeScript
>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>)
>>>>>>>>>>>>>> to add support for multi-lang will bring in a lot of features 
>>>>>>>>>>>>>> (for example,
>>>>>>>>>>>>>> I/O connectors) for free.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if
>>>>>>>>>>>>>>> folks are interested, though the "Swift Way" would be to have 
>>>>>>>>>>>>>>> it in its own
>>>>>>>>>>>>>>> repo so that it can easily be used from the Swift Package 
>>>>>>>>>>>>>>> Manager.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also
>>>>>>>>>>>>>> it'll be easier to comment on a PR :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Cham
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> B
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to