I'm building a pipeline that streams from Pubsub and writes to files. I'm
using FileIO's dynamic destinations to place elements into different
directories according to date and I really don't care about ordering of
elements beyond the date buckets.
So, I think GlobalWindows is appropriate in this case, even though the
input is unbounded. Is it possible to use GlobalWindows but set a trigger
based on number of elements and/or processing time so that beam actually
writes out files periodically?
I tried the following:
Window.into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
.discardingFiredPanes()
But it raises an exception about incompatible triggers:
Inputs to Flatten had incompatible triggers:
Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
AfterSynchronizedProcessingTime.pastFirstElementInPane()))
I believe that what's happening is that FileIO with explicit numShards
(required in the case of unbounded input) is forcing a GroupByKey, which
activates continuation triggers that are incompatible with my stated
triggers. It's internals of WriteFiles that's trying to flatten the
incompatible PCollections together.