I have a Transform that contains, in order:
* [an unbounded source which eventually moves its watermark to +Infinity when 
it's out of values]* 
Window.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes()*
 Combine.globally(myCombineFn)* [a few element-wise type conversions]* A ParDo 
that produces some logging output in processElement
Separately, I have a second ParDo directly attached to the unbounded source to 
also produce logging output.
I noticed that, when I run this pipeline with DirectRunner:
* With 0 input values, I get one NO_FIRING firing* With 1-10 input values, I 
get 1 EARLY firing and one ON_TIME firing* With 11-20 input values, I get 2 
EARLY firings and one ON_TIME firing* With 21-30 input values, I get 3 EARLY 
firings and one ON_TIME firing
Increasing to elementCountAtLeast(10):
* With 0 input values, I get one NO_FIRING firing* With 1-9 input values, I get 
one ON_TIME firing* With 10-19 input values, I get one EARLY firing and one 
ON_TIME firing
Increasing to elementCountAtLeast(12):
* With 0 input values, I get a NO_FIRING firing* With 1-11 input values, I get 
one ON_TIME firing* With 12-19 input values, I get 1 EARLY firing (at 12, 13, 
14, etc) and one ON_TIME firing* With 20-31 input values, I get 1 EARLY firing 
(at 20) and one ON_TIME firing* With 32(!)-40 values, I get 2 EARLY firings 
(1st at 20 and 2nd at 32, 33, etc) and one ON_TIME firing
* With 40-51, I get 2 EARLY firings (1st at 20, 2nd at 40) and one ON_TIME 
firing* With 52..., I get 3 EARLY firings (1st at 20, 2nd at 40, 3rd at 52, 53, 
etc) and one ON_TIME firing...
I realize this satisfies the technical design of triggers (I understand to be: 
"If you specify a trigger, you're not guaranteed it will fire, but you are 
guaranteed that it won't fire more often or earlier than you specified").  I 
also understand it's a good property for DirectRunner to simulate things like 
delays in trigger firing and other behaviors that you might see on a "real" 
runner, but this behavior might also be undesirable since a pipeline author may 
wish to quickly use DirectRunner to confirm their own understanding of 
WindowingStrategy settings, and get confused and think the above behavior is 
due to their error instead of DirectRunner behavior, especially with small 
exploratory PCollections.  It may also be undesirable because getting a 
stronger or more-well-defined guarantee about when panes actually fire (at the 
expense of performance, presumably) might be valuable for something like 
automatically integration testing a pipeline's logic.
A few questions:
1. Is my understanding of trigger semantics correct?
2. Is all this behavior actually a symptom of my UnboundedSource being 
implemented incorrectly, somehow?3. Is the above behavior exactly as intended? 
(including the 0 element case giving NO_FIRING instead of ON_TIME pane?)4. Is 
this a DirectRunner behavior or is it common across runners?  (common across 
SDKS?)5. Is the buffer (which seems to be 10 right now) that's causing this 
behavior configurable, or is it possible to disable it?

---
Wesley Tanaka
https://wtanaka.com/

Reply via email to