Interestingly, the very first prototypes of windows were actually called
buckets, and we thought of applications like geographical grouping and such
in addition to time-based aggregations. For streaming, however, event time
in special in the sense that it's required for aggregation and omnipresent,
and so this is what windows are centred on. But nothing prevents one from
creating data-dependent windows (that, in batch, may not have anything to
do with time at all) and using them as secondary keys.

The idea of carrying out-of-band metadata along with elements to
simplify/re-use transforms is certainly an interesting one; the question is
always what to do for aggregating operations (including side inputs,
stateful DoFns, etc. as well as the standard GroupByKey). Both treating
them as a sub-key (essentially creating parallel pipelines, though one
could do interesting things with merging) or providing a merging operation
for this metadata are reasonable alternatives.

- Robert



On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <k...@apache.org> wrote:

> This idea makes perfect sense conceptually.
>
> Conceptually, a "window" is just a key that has the ability to know if it
> is "done" so we can stop waiting for more input and emit the aggregation.
> To do an aggregation over unbounded input, you need to know when to stop
> waiting for more data. In Beam, the only notion of "done" that we support
> is event time, so you need to have a maximum timestamp - the one-method
> interface for BoundedWindow
> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
> in the Java SDK is correct.
>
> In Beam we have a fairly arbitrary separation between the K in KV and the
> Windowing, and they show up differently in all our APIs and require
> different sorts of plumbing. This is to make it easier to explain things in
> terms of event time, but is not essential. The true key of an aggregation
> is the Key+Window and which data you choose to put in which place is sort
> of up to you. The invariant is that composites should typically "just work"
> per-window no matter what windowing the user chooses, similarly to how
> keyed transforms should "just work" per key no matter what keying the user
> chooses.
>
> Incidentally, in SQL I found it easy to explain as "at least one of the
> GROUP BY keys must have some notion of completion". In the past, I've
> occasionally offered the example of an election, where an aggregation per
> locale has a clear notion of "done" but is not an interval in an event
> time. I like an aggregation per scrabble game better. But it doesn't sound
> like you are trying to stream data and then have the aggregation emit when
> the game concludes so you aren't in such a state. But incidentally also our
> Nexmark implementations use a merging window function that does something
> like this, in which it waits for an "auction ended" event. It is a bit of a
> hack that probably violates the Beam model but seems to work, mostly...
>
> In practice, there's no design / implementation / API / protocol for
> windows with a notion of completeness that is not event time. But IIRC in
> early Spark Runner (and maybe today?) the checking of window completeness
> was literally just querying state (because Spark watermarks can't implement
> Beam) so it could have been any state.
>
> Kenn
>
> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> I have some use cases where I have some global-ish context I'd like to
>> partition my pipeline by but that aren't based on time. Does it seem
>> reasonable to use windowing to encapsulate this kind of global context
>> anyways?
>>
>> Contrived example, imagine I have a workflow for figuring out the
>> highest scoring word in scrabble based on an input set of letters.
>>
>>
>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)
>>
>> Now If I want to use this pipeline for multiple input letter sets, I'll
>> end up mixing together candidate words that come from different letter
>> sets. I could incorporate some kind of ID for these letter sets (e.g. a
>> ScrabbleGameID) to partition with later, but then I'll need to propagate
>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its
>> own keyed operations internally which then will all need to be able to
>> accommodate bookkeeping for ScrabbleGameID.
>>
>> Generating windows that are actually based on ScrabbleGameID (e.g. one
>> window per letter set) feels like a nice way to implicitly partition my
>> pipeline so I don't have to include ScrabbleGameID into transforms that
>> really don't care about it.
>>
>> When looking at windowing functions though, they're all very timestamp
>> based which made me pause and wonder if I'm abusing the window abstraction
>> or if timetamp-based windows are just a subset of windows that are just
>> more highlighted b/c of streaming.
>>
>> (sorry hope this makes sense and is not just a ramble)
>>
>

Reply via email to