I'm building a pipeline that streams from Pubsub and writes to files. I'm
using FileIO's dynamic destinations to place elements into different
directories according to date and I really don't care about ordering of
elements beyond the date buckets.

So, I think GlobalWindows is appropriate in this case, even though the
input is unbounded. Is it possible to use GlobalWindows but set a trigger
based on number of elements and/or processing time so that beam actually
writes out files periodically?

I tried the following:

Window.into(new GlobalWindows())
  .triggering(Repeatedly.forever(AfterFirst.of(
    AfterPane.elementCountAtLeast(10000),

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
  .discardingFiredPanes()

But it raises an exception about incompatible triggers:

Inputs to Flatten had incompatible triggers:
Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
AfterSynchronizedProcessingTime.pastFirstElementInPane()))

I believe that what's happening is that FileIO with explicit numShards
(required in the case of unbounded input) is forcing a GroupByKey, which
activates continuation triggers that are incompatible with my stated
triggers. It's internals of WriteFiles that's trying to flatten the
incompatible PCollections together.

Reply via email to