I have a Transform that contains, in order:
* [an unbounded source which eventually moves its watermark to +Infinity when
it's out of values]*
Window.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes()*
Combine.globally(myCombineFn)* [a few element-wise type conversions]* A ParDo
that produces some logging output in processElement
Separately, I have a second ParDo directly attached to the unbounded source to
also produce logging output.
I noticed that, when I run this pipeline with DirectRunner:
* With 0 input values, I get one NO_FIRING firing* With 1-10 input values, I
get 1 EARLY firing and one ON_TIME firing* With 11-20 input values, I get 2
EARLY firings and one ON_TIME firing* With 21-30 input values, I get 3 EARLY
firings and one ON_TIME firing
Increasing to elementCountAtLeast(10):
* With 0 input values, I get one NO_FIRING firing* With 1-9 input values, I get
one ON_TIME firing* With 10-19 input values, I get one EARLY firing and one
ON_TIME firing
Increasing to elementCountAtLeast(12):
* With 0 input values, I get a NO_FIRING firing* With 1-11 input values, I get
one ON_TIME firing* With 12-19 input values, I get 1 EARLY firing (at 12, 13,
14, etc) and one ON_TIME firing* With 20-31 input values, I get 1 EARLY firing
(at 20) and one ON_TIME firing* With 32(!)-40 values, I get 2 EARLY firings
(1st at 20 and 2nd at 32, 33, etc) and one ON_TIME firing
* With 40-51, I get 2 EARLY firings (1st at 20, 2nd at 40) and one ON_TIME
firing* With 52..., I get 3 EARLY firings (1st at 20, 2nd at 40, 3rd at 52, 53,
etc) and one ON_TIME firing...
I realize this satisfies the technical design of triggers (I understand to be:
"If you specify a trigger, you're not guaranteed it will fire, but you are
guaranteed that it won't fire more often or earlier than you specified"). I
also understand it's a good property for DirectRunner to simulate things like
delays in trigger firing and other behaviors that you might see on a "real"
runner, but this behavior might also be undesirable since a pipeline author may
wish to quickly use DirectRunner to confirm their own understanding of
WindowingStrategy settings, and get confused and think the above behavior is
due to their error instead of DirectRunner behavior, especially with small
exploratory PCollections. It may also be undesirable because getting a
stronger or more-well-defined guarantee about when panes actually fire (at the
expense of performance, presumably) might be valuable for something like
automatically integration testing a pipeline's logic.
A few questions:
1. Is my understanding of trigger semantics correct?
2. Is all this behavior actually a symptom of my UnboundedSource being
implemented incorrectly, somehow?3. Is the above behavior exactly as intended?
(including the 0 element case giving NO_FIRING instead of ON_TIME pane?)4. Is
this a DirectRunner behavior or is it common across runners? (common across
SDKS?)5. Is the buffer (which seems to be 10 right now) that's causing this
behavior configurable, or is it possible to disable it?
---
Wesley Tanaka
https://wtanaka.com/