On Thu, Feb 6, 2025 at 8:39 AM Joey Tran <joey.t...@schrodinger.com> wrote:
> Thanks all for the conceptual sanity check. I went ahead and tried > implementing a data-based window type. I've gotten most of the way there > but it was surprisingly difficult. Not really because of any complications > due to the window not being based on timestamps (which is what I was > worried about), but actually just because of all the runner API logic for > turning coders/windowing strategies into runner-safe coders/windowing > strategies. > > To define my new window type, I first... > - Defined a new window type `class DataBasedWindow(BoundedWindow)` > - Created a new window coder for it > - Created a new WindowFn to assign data to their DataBasedWindows > > I thought this would be enough, but then I ran into the following issues > when running a unit test with the python directrunner (really just the > fn_api_runner) > - The general trigger driver tried to read `window.end` but the window was > just plain bytes > - I thought fn_api_runner wasn't properly decoding windows but turns out > it does decode properly - the window-value coder it uses just uses a > bytescoder for the window > - I figured that the translations phase was turning the windowfn and > windowcoder into "safe" wfn's and coders > - I registered urns for the new windowfn and windowcoder, but a bytes > coder was _still_ being used for the window > - I realized that TransformContext doesn't actually consider just any > coder whose URN is registered as safe. Only ones defined in > `common_urns.coders`. So I had to hackily add it to both that and the > python_sdk capabilities set > > Did I accidentally veer off the expected path for implementing a new > window type or are new window types just not something that many users do > with the python SDK? Based on the last point, it doesn't actually even seem > possible to. > Well, you're definitely off the beaten path, but this should still be supported. E.g. https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1009 was made to handle this scenario. I'm curious how your case is different than https://github.com/apache/beam/blob/release-2.60.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1120 > > > On Wed, Feb 5, 2025 at 1:01 PM Robert Bradshaw via user < > user@beam.apache.org> wrote: > >> Interestingly, the very first prototypes of windows were actually called >> buckets, and we thought of applications like geographical grouping and such >> in addition to time-based aggregations. For streaming, however, event time >> in special in the sense that it's required for aggregation and omnipresent, >> and so this is what windows are centred on. But nothing prevents one from >> creating data-dependent windows (that, in batch, may not have anything to >> do with time at all) and using them as secondary keys. >> >> The idea of carrying out-of-band metadata along with elements to >> simplify/re-use transforms is certainly an interesting one; the question is >> always what to do for aggregating operations (including side inputs, >> stateful DoFns, etc. as well as the standard GroupByKey). Both treating >> them as a sub-key (essentially creating parallel pipelines, though one >> could do interesting things with merging) or providing a merging operation >> for this metadata are reasonable alternatives. >> >> - Robert >> >> >> >> On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> This idea makes perfect sense conceptually. >>> >>> Conceptually, a "window" is just a key that has the ability to know if >>> it is "done" so we can stop waiting for more input and emit the >>> aggregation. To do an aggregation over unbounded input, you need to know >>> when to stop waiting for more data. In Beam, the only notion of "done" that >>> we support is event time, so you need to have a maximum timestamp - the >>> one-method interface for BoundedWindow >>> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78> >>> in the Java SDK is correct. >>> >>> In Beam we have a fairly arbitrary separation between the K in KV and >>> the Windowing, and they show up differently in all our APIs and require >>> different sorts of plumbing. This is to make it easier to explain things in >>> terms of event time, but is not essential. The true key of an aggregation >>> is the Key+Window and which data you choose to put in which place is sort >>> of up to you. The invariant is that composites should typically "just work" >>> per-window no matter what windowing the user chooses, similarly to how >>> keyed transforms should "just work" per key no matter what keying the user >>> chooses. >>> >>> Incidentally, in SQL I found it easy to explain as "at least one of the >>> GROUP BY keys must have some notion of completion". In the past, I've >>> occasionally offered the example of an election, where an aggregation per >>> locale has a clear notion of "done" but is not an interval in an event >>> time. I like an aggregation per scrabble game better. But it doesn't sound >>> like you are trying to stream data and then have the aggregation emit when >>> the game concludes so you aren't in such a state. But incidentally also our >>> Nexmark implementations use a merging window function that does something >>> like this, in which it waits for an "auction ended" event. It is a bit of a >>> hack that probably violates the Beam model but seems to work, mostly... >>> >>> In practice, there's no design / implementation / API / protocol for >>> windows with a notion of completeness that is not event time. But IIRC in >>> early Spark Runner (and maybe today?) the checking of window completeness >>> was literally just querying state (because Spark watermarks can't implement >>> Beam) so it could have been any state. >>> >>> Kenn >>> >>> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> I have some use cases where I have some global-ish context I'd like to >>>> partition my pipeline by but that aren't based on time. Does it seem >>>> reasonable to use windowing to encapsulate this kind of global context >>>> anyways? >>>> >>>> Contrived example, imagine I have a workflow for figuring out the >>>> highest scoring word in scrabble based on an input set of letters. >>>> >>>> >>>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str) >>>> >>>> Now If I want to use this pipeline for multiple input letter sets, I'll >>>> end up mixing together candidate words that come from different letter >>>> sets. I could incorporate some kind of ID for these letter sets (e.g. a >>>> ScrabbleGameID) to partition with later, but then I'll need to propagate >>>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its >>>> own keyed operations internally which then will all need to be able to >>>> accommodate bookkeeping for ScrabbleGameID. >>>> >>>> Generating windows that are actually based on ScrabbleGameID (e.g. one >>>> window per letter set) feels like a nice way to implicitly partition my >>>> pipeline so I don't have to include ScrabbleGameID into transforms that >>>> really don't care about it. >>>> >>>> When looking at windowing functions though, they're all very timestamp >>>> based which made me pause and wonder if I'm abusing the window abstraction >>>> or if timetamp-based windows are just a subset of windows that are just >>>> more highlighted b/c of streaming. >>>> >>>> (sorry hope this makes sense and is not just a ramble) >>>> >>>