I think it's possible to accomplish this without a custom WindowFn. Your
naming function in FileIO should be able to access the element itself, not
just the window. I would have your stateful DoFn attach a sequence number
to each element (store an extra ValueState or CombiningState, and increment
it on every element). Then attach this number to the output element. You
could do this by outputting a KV (so your output would be KV<Long,
Iterable<Type>>) or you could use Beam schemas for your output type - your
choice. Then have your naming function use this sequence number to name the
output files.

On Fri, Jan 29, 2021 at 2:32 AM Pradyumna Achar <[email protected]>
wrote:

>
> Thank you, that worked.
> One small glitch was that the trigger is not guaranteed to fire for every
> element. (AfterPane.elementCountAtLeast(1) might fire after one element, or
> more). When it fires for more than one of those iterables, I get
> double/triple/etc of the intended file sizes, which is undesirable.
> To overcome this, I created another window class, similar to the
> IntervalWindow (called it BespokeIntervalWindow, which is the same thing as
> the IntervalWindow except that there is an extra UUID field in it, in
> addition to the "start" and "end" fields, that keeps each such window
> distinct), and I have a WindowFn that gets the existing IntervalWindow from
> the AssignContext and assigns an equivalent BespokeIntervalWindow to each
> iterable in its assignWindow method. I then set
> the AfterPane.elementCountAtLeast(1) trigger on this WindowFn to make it
> not wait till the end of the interval to emit the pane.
>
>
>

Reply via email to