Hey Folks,

I'm playing around with using TestStream and having a bit of a hard time
trying to use it for early firings within a global window. My test stream
looks like something like this:

TestStream.create(...)
    .advanceWatermarkTo('2019-01-01')
    .addElements(kv1)
    .advanceProcessingTime(21s)
    .addElements(kv2)
    .advanceProcessingTime(30s)
    .advanceWatermarkToInfinity()

The pipeline (I'm unfortunately paraphrasing; I'm actually using Clojure
for this) is really simple:

pipeline
.apply(testStream)
.apply("Window", Window.into(new GlobalWindows())
    .triggering(...)
    .accumulatingFiredPanes())
.apply("Group", GroupByKey.create())
.apply("ShowWindows", Reify.windows())
.apply("PrintIt", MapElements.via(v -> { System.out.println(v.timestamp());
return v; }))

When I use AfterProcessingTime.pastFirstElementInPane(), it looks like I
only ever get the last firing (maybe the panes get merged?) with the
timestamp of the elements as "294247-01-09T04:00:54.775Z" & the pane as
"PaneInfo{isFirst=true, isLast=true, timing=EARLY, index=0}". When I use
Never.ever() as the trigger, I get the same exact output.
AfterPane.elementCountAtLeast(1) also produces the same result. Wrapping
either of these in Repeatedly.forever gives multiple panes (yay?) but all
of them still have the high timestamp.

Is this expected? Am I just misunderstanding what triggers do or is this
something that only happens with TestStream?

Thanks,
Mike

Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.

Reply via email to