This idea makes perfect sense conceptually.

Conceptually, a "window" is just a key that has the ability to know if it
is "done" so we can stop waiting for more input and emit the aggregation.
To do an aggregation over unbounded input, you need to know when to stop
waiting for more data. In Beam, the only notion of "done" that we support
is event time, so you need to have a maximum timestamp - the one-method
interface for BoundedWindow
<https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
in the Java SDK is correct.

In Beam we have a fairly arbitrary separation between the K in KV and the
Windowing, and they show up differently in all our APIs and require
different sorts of plumbing. This is to make it easier to explain things in
terms of event time, but is not essential. The true key of an aggregation
is the Key+Window and which data you choose to put in which place is sort
of up to you. The invariant is that composites should typically "just work"
per-window no matter what windowing the user chooses, similarly to how
keyed transforms should "just work" per key no matter what keying the user
chooses.

Incidentally, in SQL I found it easy to explain as "at least one of the
GROUP BY keys must have some notion of completion". In the past, I've
occasionally offered the example of an election, where an aggregation per
locale has a clear notion of "done" but is not an interval in an event
time. I like an aggregation per scrabble game better. But it doesn't sound
like you are trying to stream data and then have the aggregation emit when
the game concludes so you aren't in such a state. But incidentally also our
Nexmark implementations use a merging window function that does something
like this, in which it waits for an "auction ended" event. It is a bit of a
hack that probably violates the Beam model but seems to work, mostly...

In practice, there's no design / implementation / API / protocol for
windows with a notion of completeness that is not event time. But IIRC in
early Spark Runner (and maybe today?) the checking of window completeness
was literally just querying state (because Spark watermarks can't implement
Beam) so it could have been any state.

Kenn

On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <joey.t...@schrodinger.com>
wrote:

> I have some use cases where I have some global-ish context I'd like to
> partition my pipeline by but that aren't based on time. Does it seem
> reasonable to use windowing to encapsulate this kind of global context
> anyways?
>
> Contrived example, imagine I have a workflow for figuring out the
> highest scoring word in scrabble based on an input set of letters.
>
>
> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)
>
> Now If I want to use this pipeline for multiple input letter sets, I'll
> end up mixing together candidate words that come from different letter
> sets. I could incorporate some kind of ID for these letter sets (e.g. a
> ScrabbleGameID) to partition with later, but then I'll need to propagate
> that key everywhere. For example, `EnumerateAllPossibleWords` may do its
> own keyed operations internally which then will all need to be able to
> accommodate bookkeeping for ScrabbleGameID.
>
> Generating windows that are actually based on ScrabbleGameID (e.g. one
> window per letter set) feels like a nice way to implicitly partition my
> pipeline so I don't have to include ScrabbleGameID into transforms that
> really don't care about it.
>
> When looking at windowing functions though, they're all very timestamp
> based which made me pause and wonder if I'm abusing the window abstraction
> or if timetamp-based windows are just a subset of windows that are just
> more highlighted b/c of streaming.
>
> (sorry hope this makes sense and is not just a ramble)
>

Reply via email to