On Fri, May 3, 2019 at 2:10 PM Mike Kaplinskiy <[email protected]> wrote:
> Hey Folks,
>
> I'm playing around with using TestStream and having a bit of a hard time
> trying to use it for early firings within a global window. My test stream
> looks like something like this:
>
> TestStream.create(...)
> .advanceWatermarkTo('2019-01-01')
> .addElements(kv1)
> .advanceProcessingTime(21s)
> .addElements(kv2)
> .advanceProcessingTime(30s)
> .advanceWatermarkToInfinity()
>
> The pipeline (I'm unfortunately paraphrasing; I'm actually using Clojure
> for this) is really simple:
>
> pipeline
> .apply(testStream)
> .apply("Window", Window.into(new GlobalWindows())
> .triggering(...)
> .accumulatingFiredPanes())
> .apply("Group", GroupByKey.create())
> .apply("ShowWindows", Reify.windows())
> .apply("PrintIt", MapElements.via(v -> {
> System.out.println(v.timestamp()); return v; }))
>
> When I use AfterProcessingTime.pastFirstElementInPane(), it looks like I
> only ever get the last firing (maybe the panes get merged?)
>
What is the processing time duration you are using? Panes should not be
merged. I cannot find the Jira now, but there was a known issue that
without a Repeatedly.forever(...) around triggers, they would "turn off"
and then never output again. It is variably called the trigger "finishing"
or the window "closing". It was decided that this was a bug that could
cause data loss and I thought it had been addressed. I will re-investigate.
with the timestamp of the elements as "294247-01-09T04:00:54.775Z" & the
> pane as "PaneInfo{isFirst=true, isLast=true, timing=EARLY, index=0}"
>
The timestamp is expected - it is the "end of the window" which is the
default.
> When I use Never.ever() as the trigger, I get the same exact output.
> AfterPane.elementCountAtLeast(1) also produces the same result. Wrapping
> either of these in Repeatedly.forever gives multiple panes (yay?) but all
> of them still have the high timestamp.
>
Never.ever() should trigger once at GC time.
AfterPane.elementcountAtLeast(1) is another trigger that requires a repeat
block. So your final attempt to get the trigger you want is right. The
timestamp is still high because it is the end of the window. You can adjust
with Window.withTimestampCombiner(...)
Thanks for raising this. It seems the fix may have slipped off the radar or
been incomplete.
Kenn
Is this expected? Am I just misunderstanding what triggers do or is this
> something that only happens with TestStream?
>
> Thanks,
> Mike
>
> Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.
>