Re: Triggering partway through a window

2021-04-06 Thread Raman Gupta
On Mon, Mar 29, 2021 at 1:17 PM Kenneth Knowles  wrote:

> That's a neat example!
>
> The trigger you have there will emit a ton of output. What is your
> accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
> you would not actually have access to the prior 6 days of input.
>

Yup, accumulating.


>
> The only trigger that is based on completeness of data is the
> AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
> capture the 6 days of data:
>
> prior6days = input.apply(Window.into(<6 day windows sliding one
> day>).triggering(AfterWatermark.pastEndOfWindow())
>
> Now if you GBK this collection, each group will have a timestamp that is
> the end of the 6 day period. You can use ParDo with outputWithTimestamp to
> move the timestamp up to any timestamp in the following day, yielding a
> PCollection of 6 day grouping of data with a timestamp in the last day of
> the 7. If the 6 days of data is large you may hit size limits (either hard
> limits or perf problems) and have to do something fancier.
>
> Flatten this with the input PCollection and window into FixedWindows( day>) and trigger however you like, again with accumulatingFiredPanes().
> There is no guarantee that the 6 days of past data arrives prior to
> elements in the last day. In fact since it will be delayed by an extra
> shuffle you would expect it to often show up later. So this is a heuristic
> approach equivalent to what it sounds like you are already doing that
> should lower the cost.
>

Ah interesting. Yes, this would likely have worked for me.


>
> If you want a guarantee that the 6 day buffer arrives prior to the other
> elements you will need to do something else. You could write a WindowFn
> that assigned all 7 days of data to a window that only spanned the first 6
> days, then trigger at end of window plus allowing late data (no early
> firings). Then every firing would be guaranteed by the watermark to have
> the first 6 days of data plus whatever else has shown up. (I assume part of
> your spec is that you do want data to be processed as it arrives, versus
> waiting until the end of the 7 day window).
>

I was curious about this option, and tried it. One issue I ran into was
that the downstream logic had some "odd" windows to deal with because the
window interval did not properly reflect its contents, which resulted in
some downstream logic that wasn't as encapsulated as it should be.

I therefore created a PTransform "DailyWindowsWithContext" that first does
the contextual windowing ("ContextualCalendarDayWindow") and GBK. It then
"re-windows" the elements + their context by filtering out the
"context-only" groups, sets the timestamps of the remaining groups based on
the max of the element timestamps, and then outputs them into a fixed
daily window, followed by another GBK and flatten.

This seems to work quite well with my set of unit tests, though I haven't
used it extensively yet. If anyone is curious about the code (written in
Kotlin), see here:

https://gist.github.com/rocketraman/543f066813fc89590f23ff5dacf43f01

Feedback on this code would be more than welcome.

Regards,
Raman



>
> I am just writing this without coding, so I could certainly have missed
> something or gotten it wrong.
>
> Kenn
>
> On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta  wrote:
>
>> I have a 7-day sliding calendar window, sliding by 1 day. The intent is
>> to process only elements that fall into the last day of a window, but still
>> have access to the elements from the preceding six days.
>>
>> I created a sliding calendar window function, and trigger it like this:
>>
>> AfterWatermark.pastEndOfWindow()
>>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>>
>> Downstream of this pipeline I have a GBK and a DoFn that basically
>> ignores elements until at least some of them are in the last day of
>> the window.
>>
>> The above trigger works and the pipeline produces the expected output,
>> but runs the GBK and downstream logic many more times than is necessary.
>>
>> Is there a way I can optimize the triggering here such that the early
>> firings begin only when the watermark moves into the last day of the 7-day
>> window?
>>
>> Thanks,
>> Raman
>>
>>


Re: Triggering partway through a window

2021-03-29 Thread Kenneth Knowles
That's a neat example!

The trigger you have there will emit a ton of output. What is your
accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
you would not actually have access to the prior 6 days of input.

The only trigger that is based on completeness of data is the
AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
capture the 6 days of data:

prior6days = input.apply(Window.into(<6 day windows sliding one
day>).triggering(AfterWatermark.pastEndOfWindow())

Now if you GBK this collection, each group will have a timestamp that is
the end of the 6 day period. You can use ParDo with outputWithTimestamp to
move the timestamp up to any timestamp in the following day, yielding a
PCollection of 6 day grouping of data with a timestamp in the last day of
the 7. If the 6 days of data is large you may hit size limits (either hard
limits or perf problems) and have to do something fancier.

Flatten this with the input PCollection and window into FixedWindows() and trigger however you like, again with accumulatingFiredPanes().
There is no guarantee that the 6 days of past data arrives prior to
elements in the last day. In fact since it will be delayed by an extra
shuffle you would expect it to often show up later. So this is a heuristic
approach equivalent to what it sounds like you are already doing that
should lower the cost.

If you want a guarantee that the 6 day buffer arrives prior to the other
elements you will need to do something else. You could write a WindowFn
that assigned all 7 days of data to a window that only spanned the first 6
days, then trigger at end of window plus allowing late data (no early
firings). Then every firing would be guaranteed by the watermark to have
the first 6 days of data plus whatever else has shown up. (I assume part of
your spec is that you do want data to be processed as it arrives, versus
waiting until the end of the 7 day window).

I am just writing this without coding, so I could certainly have missed
something or gotten it wrong.

Kenn

On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta  wrote:

> I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
> process only elements that fall into the last day of a window, but still
> have access to the elements from the preceding six days.
>
> I created a sliding calendar window function, and trigger it like this:
>
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> Downstream of this pipeline I have a GBK and a DoFn that basically ignores
> elements until at least some of them are in the last day of the window.
>
> The above trigger works and the pipeline produces the expected output, but
> runs the GBK and downstream logic many more times than is necessary.
>
> Is there a way I can optimize the triggering here such that the early
> firings begin only when the watermark moves into the last day of the 7-day
> window?
>
> Thanks,
> Raman
>
>


Re: Triggering partway through a window

2021-03-26 Thread Raman Gupta
I experimented with
`AfterProcessingTime.pastFirstElementInPane().plusDelayOf(6.days)` which is
*almost* working, but delays elements on the last day depending on the
element timestamps from the first day.

I think I'd need something like
`AfterProcessingTime.pastStartOfWindow().plusDelayOf(6.days)`, but I'm
finding it difficult to understand how to create such a trigger, or if
something like that is even possible.

Regards,
Raman


On Fri, Mar 26, 2021 at 4:47 PM Raman Gupta  wrote:

> I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
> process only elements that fall into the last day of a window, but still
> have access to the elements from the preceding six days.
>
> I created a sliding calendar window function, and trigger it like this:
>
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> Downstream of this pipeline I have a GBK and a DoFn that basically ignores
> elements until at least some of them are in the last day of the window.
>
> The above trigger works and the pipeline produces the expected output, but
> runs the GBK and downstream logic many more times than is necessary.
>
> Is there a way I can optimize the triggering here such that the early
> firings begin only when the watermark moves into the last day of the 7-day
> window?
>
> Thanks,
> Raman
>
>


Triggering partway through a window

2021-03-26 Thread Raman Gupta
I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
process only elements that fall into the last day of a window, but still
have access to the elements from the preceding six days.

I created a sliding calendar window function, and trigger it like this:

AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
  .withLateFirings(AfterPane.elementCountAtLeast(1))

Downstream of this pipeline I have a GBK and a DoFn that basically ignores
elements until at least some of them are in the last day of the window.

The above trigger works and the pipeline produces the expected output, but
runs the GBK and downstream logic many more times than is necessary.

Is there a way I can optimize the triggering here such that the early
firings begin only when the watermark moves into the last day of the 7-day
window?

Thanks,
Raman