This is great that is coming together, and am glad for the messages along the way to understand process, choices, ...!
On Fri, Aug 25, 2023, 2:04 PM Byron Ellis via user <user@beam.apache.org> wrote: > Okay, after a brief detour through "get this working in the Flink Portable > Runner" I think I have something pretty workable. > > PInput and POutput can actually be structs rather than protocols, which > simplifies things quite a bit. It also allows us to use them with property > wrappers for a SwiftUI-like experience if we want when defining DoFns > (which is what I was originally intending to use them for). That also means > the function signature you use for closures would match full-fledged DoFn > definitions for the most part which is satisfying. > > > > On Thu, Aug 24, 2023 at 5:55 PM Byron Ellis <byronel...@google.com> wrote: > >> Okay, I tried a couple of different things. >> >> Implicitly passing the timestamp and window during iteration did not go >> well. While physically possible it introduces an invisible side effect into >> loop iteration which confused me when I tried to use it and I implemented >> it. Also, I'm pretty sure there'd end up being some sort of race condition >> nightmare continuing down that path. >> >> What I decided to do instead was the following: >> >> 1. Rename the existing "pardo" functions to "pstream" and require that >> they always emit a window and timestamp along with their value. This >> eliminates the side effect but lets us keep iteration in a bundle where >> that might be convenient. For example, in my cheesy GCS implementation it >> means that I can keep an OAuth token around for the lifetime of the bundle >> as a local variable, which is convenient. It's a bit more typing for users >> of pstream, but the expectation here is that if you're using pstream >> functions You Know What You Are Doing and most people won't be using it >> directly. >> >> 2. Introduce a new set of pardo functions (I didn't do all of them yet, >> but enough to test the functionality and decide I liked it) which take a >> function signature of (any PInput<InputType>,any POutput<OutputType>). >> PInput takes the (InputType,Date,Window) tuple and converts it into a >> struct with friendlier names. Not strictly necessary, but makes the code >> nicer to read I think. POutput introduces emit functions that optionally >> allow you to specify a timestamp and a window. If you don't for either one >> it will take the timestamp and/or window of the input. >> >> Trying to use that was pretty pleasant to use so I think we should >> continue down that path. If you'd like to see it in use, I reimplemented >> map() and flatMap() in terms of this new pardo functionality. >> >> Code has been pushed to the branch/PR if you're interested in taking a >> look. >> >> >> >> >> >> >> >> >> >> On Thu, Aug 24, 2023 at 2:15 PM Byron Ellis <byronel...@google.com> >> wrote: >> >>> Gotcha, I think there's a fairly easy solution to link input and output >>> streams.... Let me try it out... might even be possible to have both >>> element and stream-wise closure pardos. Definitely possible to have that at >>> the DoFn level (called SerializableFn in the SDK because I want to >>> use @DoFn as a macro) >>> >>> On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> >>>>> >>>>> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> I would like to figure out a way to get the stream-y interface to >>>>>> work, as I think it's more natural overall. >>>>>> >>>>>> One hypothesis is that if any elements are carried over loop >>>>>> iterations, there will likely be some that are carried over beyond the >>>>>> loop >>>>>> (after all the callee doesn't know when the loop is supposed to end). We >>>>>> could reject "plain" elements that are emitted after this point, >>>>>> requiring >>>>>> one to emit timestamp-windowed-values. >>>>>> >>>>> >>>>> Are you assuming that the same stream (or overlapping sets of data) >>>>> are pushed to multiple workers ? I thought that the set of data streamed >>>>> here are the data that belong to the current bundle (hence already >>>>> assigned >>>>> to the current worker) so any output from the current bundle invocation >>>>> would be a valid output of that bundle. >>>>> >>>>>> >>>> Yes, the content of the stream is exactly the contents of the bundle. >>>> The question is how to do the input_element:output_element correlation for >>>> automatically propagating metadata. >>>> >>>> >>>>> Related to this, we could enforce that the only (user-accessible) way >>>>>> to get such a timestamped value is to start with one, e.g. a >>>>>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same >>>>>> metadata but a new value. Thus a user wanting to do anything "fancy" >>>>>> would >>>>>> have to explicitly request iteration over these windowed values rather >>>>>> than >>>>>> over the raw elements. (This is also forward compatible with expanding >>>>>> the >>>>>> metadata that can get attached, e.g. pane infos, and makes the right >>>>>> thing >>>>>> the easiest/most natural.) >>>>>> >>>>>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Ah, that is a good point—being element-wise would make managing >>>>>>> windows and time stamps easier for the user. Fortunately it’s a fairly >>>>>>> easy >>>>>>> change to make and maybe even less typing for the user. I was originally >>>>>>> thinking side inputs and metrics would happen outside the loop, but I >>>>>>> think >>>>>>> you want a class and not a closure at that point for sanity. >>>>>>> >>>>>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw < >>>>>>> rober...@google.com> wrote: >>>>>>> >>>>>>>> Ah, I see. >>>>>>>> >>>>>>>> Yeah, I've thought about using an iterable for the whole bundle >>>>>>>> rather than start/finish bundle callbacks, but one of the questions is >>>>>>>> how >>>>>>>> that would impact implicit passing of the timestamp (and other) >>>>>>>> metadata >>>>>>>> from input elements to output elements. (You can of course attach the >>>>>>>> metadata to any output that happens in the loop body, but it's very >>>>>>>> easy to >>>>>>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering >>>>>>>> or >>>>>>>> otherwise modifying local state) and this would be hard to detect. (I >>>>>>>> suppose trying to output after the loop finishes could require >>>>>>>> something more explicit). >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Oh, I also forgot to mention that I included element-wise >>>>>>>>> collection operations like "map" that eliminate the need for pardo in >>>>>>>>> many >>>>>>>>> cases. the groupBy command is actually a map + groupByKey under the >>>>>>>>> hood. >>>>>>>>> That was to be more consistent with Swift's collection protocol (and >>>>>>>>> is >>>>>>>>> also why PCollection and PCollectionStream are different types... >>>>>>>>> PCollection implements map and friends as pipeline construction >>>>>>>>> operations >>>>>>>>> whereas PCollectionStream is an actual stream) >>>>>>>>> >>>>>>>>> I just happened to push some "IO primitives" that uses map rather >>>>>>>>> than pardo in a couple of places to do a true wordcount using good ol' >>>>>>>>> Shakespeare and very very primitive GCS IO. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> B >>>>>>>>> >>>>>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite >>>>>>>>>> a bit before settling on where I ended up. Ultimately I decided to >>>>>>>>>> go with >>>>>>>>>> something that felt more Swift-y than anything else which means that >>>>>>>>>> rather >>>>>>>>>> than dealing with a single element like you do in the other SDKs >>>>>>>>>> you're >>>>>>>>>> dealing with a stream of elements (which of course will often be of >>>>>>>>>> size >>>>>>>>>> 1). That's a really natural paradigm in the Swift world especially >>>>>>>>>> with the >>>>>>>>>> async / await structures. So when you see something like: >>>>>>>>>> >>>>>>>>>> pardo(name:"Read Files") { filenames,output,errors in >>>>>>>>>> >>>>>>>>>> for try await (filename,_,_) in filenames { >>>>>>>>>> ... >>>>>>>>>> output.emit(data) >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> filenames is the input stream and then output and errors are both >>>>>>>>>> output streams. In theory you can have as many output streams as you >>>>>>>>>> like >>>>>>>>>> though at the moment there's a compiler bug in the new type pack >>>>>>>>>> feature >>>>>>>>>> that limits it to "as many as I felt like supporting". Presumably >>>>>>>>>> this will >>>>>>>>>> get fixed before the official 5.9 release which will probably be in >>>>>>>>>> the >>>>>>>>>> October timeframe if history is any guide) >>>>>>>>>> >>>>>>>>>> If you had parameterization you wanted to send that would look >>>>>>>>>> like pardo("Parameter") { param,filenames,output,error in ... } where >>>>>>>>>> "param" would take on the value of "Parameter." All of this is being >>>>>>>>>> typechecked at compile time BTW. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> the (filename,_,_) is a tuple spreading construct like you have >>>>>>>>>> in ES6 and other things where "_" is Swift for "ignore." In this case >>>>>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so >>>>>>>>>> you can >>>>>>>>>> optionally extract the timestamp and the window if you want to >>>>>>>>>> manipulate >>>>>>>>>> it somehow. >>>>>>>>>> >>>>>>>>>> That said it would also be natural to provide elementwise >>>>>>>>>> pardos--- that would probably mean having explicit type signatures >>>>>>>>>> in the >>>>>>>>>> closure. I had that at one point, but it felt less natural the more >>>>>>>>>> I used >>>>>>>>>> it. I'm also slowly working towards adding a more "traditional" DoFn >>>>>>>>>> implementation approach where you implement the DoFn as an object >>>>>>>>>> type. In >>>>>>>>>> that case it would be very very easy to support both by having a >>>>>>>>>> default >>>>>>>>>> stream implementation call the equivalent of processElement. To make >>>>>>>>>> that >>>>>>>>>> performant I need to implement an @DoFn macro and I just haven't >>>>>>>>>> gotten to >>>>>>>>>> it yet. >>>>>>>>>> >>>>>>>>>> It's a bit more work and I've been prioritizing implementing >>>>>>>>>> composite and external transforms for the reasons you suggest. :-) >>>>>>>>>> I've got >>>>>>>>>> the basics of a composite transform (there's an equivalent wordcount >>>>>>>>>> example) and am hooking it into the pipeline generation, which >>>>>>>>>> should also >>>>>>>>>> give me everything I need to successfully hook in external >>>>>>>>>> transforms as >>>>>>>>>> well. That will give me the jump on IOs as you say. I can also treat >>>>>>>>>> the >>>>>>>>>> pipeline itself as a composite transform which lets me get rid of the >>>>>>>>>> Pipeline { pipeline in ... } and just instead have things attach >>>>>>>>>> themselves >>>>>>>>>> to the pipeline implicitly. >>>>>>>>>> >>>>>>>>>> That said, there are some interesting IO possibilities that would >>>>>>>>>> be Swift native. In particularly, I've been looking at the native >>>>>>>>>> Swift >>>>>>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not >>>>>>>>>> distributed in the same was as, say, Beam SQL... but it would allow >>>>>>>>>> for SQL >>>>>>>>>> statements on individual files with projection pushdown supported for >>>>>>>>>> things like Parquet which could have some cool and performant data >>>>>>>>>> lake >>>>>>>>>> applications. I'll probably do a couple of the simpler IOs as >>>>>>>>>> well---there's a Swift AWS SDK binding that's pretty good that would >>>>>>>>>> give >>>>>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty >>>>>>>>>> easy to >>>>>>>>>> work with GCS. >>>>>>>>>> >>>>>>>>>> In any case, I'm updating the branch as I find a minute here and >>>>>>>>>> there. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> B >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw < >>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Neat. >>>>>>>>>>> >>>>>>>>>>> Nothing like writing and SDK to actually understand how the >>>>>>>>>>> FnAPI works :). I like the use of groupBy. I have to admit I'm a bit >>>>>>>>>>> mystified by the syntax for parDo (I don't know swift at all which >>>>>>>>>>> is >>>>>>>>>>> probably tripping me up). The addition of external (cross-language) >>>>>>>>>>> transforms could let you steal everything (e.g. IOs) pretty quickly >>>>>>>>>>> from >>>>>>>>>>> other SDKs. >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user < >>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> For everyone who is interested, here's the draft PR: >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/beam/pull/28062 >>>>>>>>>>>> >>>>>>>>>>>> I haven't had a chance to test it on my M1 machine yet though >>>>>>>>>>>> (there's a good chance there are a few places that need to >>>>>>>>>>>> properly address >>>>>>>>>>>> endianness. Specifically timestamps in windowed values and length >>>>>>>>>>>> in >>>>>>>>>>>> iterable coders as those both use specifically bigendian >>>>>>>>>>>> representations) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis < >>>>>>>>>>>> byronel...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Cham, >>>>>>>>>>>>> >>>>>>>>>>>>> Definitely happy to open a draft PR so folks can >>>>>>>>>>>>> comment---there's not as much code as it looks like since most of >>>>>>>>>>>>> the LOC >>>>>>>>>>>>> is just generated protobuf. As for the support, I definitely want >>>>>>>>>>>>> to add >>>>>>>>>>>>> external transforms and may actually add that support before >>>>>>>>>>>>> adding the >>>>>>>>>>>>> ability to make composites in the language itself. With the way >>>>>>>>>>>>> the SDK is >>>>>>>>>>>>> laid out adding composites to the pipeline graph is a separate >>>>>>>>>>>>> operation >>>>>>>>>>>>> than defining a composite. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath < >>>>>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is >>>>>>>>>>>>>> interest in Swift SDK from folks currently subscribed to the >>>>>>>>>>>>>> +user <user@beam.apache.org> list. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev < >>>>>>>>>>>>>> d...@beam.apache.org> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello everyone, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> A couple of months ago I decided that I wanted to really >>>>>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with >>>>>>>>>>>>>>> the Portable >>>>>>>>>>>>>>> Runner. For me at least that usually means I need to write some >>>>>>>>>>>>>>> code so I >>>>>>>>>>>>>>> can see things happening in a debugger and to really prove to >>>>>>>>>>>>>>> myself I >>>>>>>>>>>>>>> understood what was going on I decided I couldn't use an >>>>>>>>>>>>>>> existing SDK >>>>>>>>>>>>>>> language to do it since there would be the temptation to read >>>>>>>>>>>>>>> some code and >>>>>>>>>>>>>>> convince myself that I actually understood what was going on. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One thing led to another and it turns out that to get a >>>>>>>>>>>>>>> minimal FnApi integration going you end up writing a fair bit >>>>>>>>>>>>>>> of an SDK. So >>>>>>>>>>>>>>> I decided to take things to a point where I had an SDK that >>>>>>>>>>>>>>> could execute a >>>>>>>>>>>>>>> word count example via a portable runner backend. I've now >>>>>>>>>>>>>>> reached that >>>>>>>>>>>>>>> point and would like to submit my prototype SDK to the list for >>>>>>>>>>>>>>> feedback. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It's currently living in a branch on my fork here: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using >>>>>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta builds >>>>>>>>>>>>>>> of 5.9 for >>>>>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try >>>>>>>>>>>>>>> it on ARM >>>>>>>>>>>>>>> hardware and make sure all of the endian checks are complete. >>>>>>>>>>>>>>> The >>>>>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example >>>>>>>>>>>>>>> that reads some >>>>>>>>>>>>>>> local files (as well as a missing file to exercise DLQ >>>>>>>>>>>>>>> functionality) and >>>>>>>>>>>>>>> output counts through two separate group by operations to get >>>>>>>>>>>>>>> it past the >>>>>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the >>>>>>>>>>>>>>> Python Portable >>>>>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct >>>>>>>>>>>>>>> Runner at this >>>>>>>>>>>>>>> time. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I've shown it to a couple of folks already and incorporated >>>>>>>>>>>>>>> some of that feedback already (for example pardo was originally >>>>>>>>>>>>>>> called dofn >>>>>>>>>>>>>>> when defining pipelines). In general I've tried to make the API >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures and >>>>>>>>>>>>>>> while there >>>>>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of what >>>>>>>>>>>>>>> would be >>>>>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There are of course a ton of missing bits still to be >>>>>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, >>>>>>>>>>>>>>> etc. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> This should be fine and we can get the code documented >>>>>>>>>>>>>> without these features. I think support for composites and >>>>>>>>>>>>>> adding an >>>>>>>>>>>>>> external transform (see, Java >>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>, >>>>>>>>>>>>>> Python >>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>, >>>>>>>>>>>>>> Go >>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>, >>>>>>>>>>>>>> TypeScript >>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>) >>>>>>>>>>>>>> to add support for multi-lang will bring in a lot of features >>>>>>>>>>>>>> (for example, >>>>>>>>>>>>>> I/O connectors) for free. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if >>>>>>>>>>>>>>> folks are interested, though the "Swift Way" would be to have >>>>>>>>>>>>>>> it in its own >>>>>>>>>>>>>>> repo so that it can easily be used from the Swift Package >>>>>>>>>>>>>>> Manager. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also >>>>>>>>>>>>>> it'll be easier to comment on a PR :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Cham >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> [2] >>>>>>>>>>>>>> [3] >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> B >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>