On Thu, Feb 6, 2025 at 8:39 AM Joey Tran <joey.t...@schrodinger.com> wrote:

> Thanks all for the conceptual sanity check. I went ahead and tried
> implementing a data-based window type. I've gotten most of the way there
> but it was surprisingly difficult. Not really because of any complications
> due to the window not being based on timestamps (which is what I was
> worried about), but actually just because of all the runner API logic for
> turning coders/windowing strategies into runner-safe coders/windowing
> strategies.
>
> To define my new window type, I first...
> - Defined a new window type `class DataBasedWindow(BoundedWindow)`
> - Created a new window coder for it
> - Created a new WindowFn to assign data to their DataBasedWindows
>
> I thought this would be enough, but then I ran into the following issues
> when running a unit test with the python directrunner (really just the
> fn_api_runner)
> - The general trigger driver tried to read `window.end` but the window was
> just plain bytes
> - I thought fn_api_runner wasn't properly decoding windows but turns out
> it does decode properly - the window-value coder it uses just uses a
> bytescoder for the window
> - I figured that the translations phase was turning the windowfn and
> windowcoder into "safe" wfn's and coders
> - I registered urns for the new windowfn and windowcoder, but a bytes
> coder was _still_ being used for the window
> - I realized that TransformContext doesn't actually consider just any
> coder whose URN is registered as safe. Only ones defined in
> `common_urns.coders`. So I had to hackily add it to both that and the
> python_sdk capabilities set
>
> Did I accidentally veer off the expected path for implementing a new
> window type or are new window types just not something that many users do
> with the python SDK? Based on the last point, it doesn't actually even seem
> possible to.
>

Well, you're definitely off the beaten path, but this should still be
supported. E.g.
https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1009
was
made to handle this scenario.

I'm curious how your case is different than
https://github.com/apache/beam/blob/release-2.60.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1120



>
>
> On Wed, Feb 5, 2025 at 1:01 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> Interestingly, the very first prototypes of windows were actually called
>> buckets, and we thought of applications like geographical grouping and such
>> in addition to time-based aggregations. For streaming, however, event time
>> in special in the sense that it's required for aggregation and omnipresent,
>> and so this is what windows are centred on. But nothing prevents one from
>> creating data-dependent windows (that, in batch, may not have anything to
>> do with time at all) and using them as secondary keys.
>>
>> The idea of carrying out-of-band metadata along with elements to
>> simplify/re-use transforms is certainly an interesting one; the question is
>> always what to do for aggregating operations (including side inputs,
>> stateful DoFns, etc. as well as the standard GroupByKey). Both treating
>> them as a sub-key (essentially creating parallel pipelines, though one
>> could do interesting things with merging) or providing a merging operation
>> for this metadata are reasonable alternatives.
>>
>> - Robert
>>
>>
>>
>> On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> This idea makes perfect sense conceptually.
>>>
>>> Conceptually, a "window" is just a key that has the ability to know if
>>> it is "done" so we can stop waiting for more input and emit the
>>> aggregation. To do an aggregation over unbounded input, you need to know
>>> when to stop waiting for more data. In Beam, the only notion of "done" that
>>> we support is event time, so you need to have a maximum timestamp - the
>>> one-method interface for BoundedWindow
>>> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
>>> in the Java SDK is correct.
>>>
>>> In Beam we have a fairly arbitrary separation between the K in KV and
>>> the Windowing, and they show up differently in all our APIs and require
>>> different sorts of plumbing. This is to make it easier to explain things in
>>> terms of event time, but is not essential. The true key of an aggregation
>>> is the Key+Window and which data you choose to put in which place is sort
>>> of up to you. The invariant is that composites should typically "just work"
>>> per-window no matter what windowing the user chooses, similarly to how
>>> keyed transforms should "just work" per key no matter what keying the user
>>> chooses.
>>>
>>> Incidentally, in SQL I found it easy to explain as "at least one of the
>>> GROUP BY keys must have some notion of completion". In the past, I've
>>> occasionally offered the example of an election, where an aggregation per
>>> locale has a clear notion of "done" but is not an interval in an event
>>> time. I like an aggregation per scrabble game better. But it doesn't sound
>>> like you are trying to stream data and then have the aggregation emit when
>>> the game concludes so you aren't in such a state. But incidentally also our
>>> Nexmark implementations use a merging window function that does something
>>> like this, in which it waits for an "auction ended" event. It is a bit of a
>>> hack that probably violates the Beam model but seems to work, mostly...
>>>
>>> In practice, there's no design / implementation / API / protocol for
>>> windows with a notion of completeness that is not event time. But IIRC in
>>> early Spark Runner (and maybe today?) the checking of window completeness
>>> was literally just querying state (because Spark watermarks can't implement
>>> Beam) so it could have been any state.
>>>
>>> Kenn
>>>
>>> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> I have some use cases where I have some global-ish context I'd like to
>>>> partition my pipeline by but that aren't based on time. Does it seem
>>>> reasonable to use windowing to encapsulate this kind of global context
>>>> anyways?
>>>>
>>>> Contrived example, imagine I have a workflow for figuring out the
>>>> highest scoring word in scrabble based on an input set of letters.
>>>>
>>>>
>>>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)
>>>>
>>>> Now If I want to use this pipeline for multiple input letter sets, I'll
>>>> end up mixing together candidate words that come from different letter
>>>> sets. I could incorporate some kind of ID for these letter sets (e.g. a
>>>> ScrabbleGameID) to partition with later, but then I'll need to propagate
>>>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its
>>>> own keyed operations internally which then will all need to be able to
>>>> accommodate bookkeeping for ScrabbleGameID.
>>>>
>>>> Generating windows that are actually based on ScrabbleGameID (e.g. one
>>>> window per letter set) feels like a nice way to implicitly partition my
>>>> pipeline so I don't have to include ScrabbleGameID into transforms that
>>>> really don't care about it.
>>>>
>>>> When looking at windowing functions though, they're all very timestamp
>>>> based which made me pause and wonder if I'm abusing the window abstraction
>>>> or if timetamp-based windows are just a subset of windows that are just
>>>> more highlighted b/c of streaming.
>>>>
>>>> (sorry hope this makes sense and is not just a ramble)
>>>>
>>>

Reply via email to