Thanks for the response, Chamikara. I filed https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work around the problem in my case by not using a ValueProvider for numShards.
On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com> wrote: > I'm not to familiar about the exact underlying issue here but writing > unbounded input to files when using GlobalWindows for unsharded output is a > valid usecase so sounds like a bug. Feel free to create a JIRA. > > - Cham > > On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> wrote: > >> I've read more deeply into the WriteFiles code and I'm understanding now >> that the exception is due to WriteFiles' attempt to handle unsharded input. >> In that case, it creates a sharded and unsharded collection; the first goes >> through one GroupByKey while the other goes through 2. These two >> collections are then flattened together and they have incompatible triggers >> due to the double-grouped collection using a continuation trigger. >> >> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I >> switch to hard coding an integer rather than passing a ValueProvider, >> WriteFiles uses a different code path that doesn't flatten collections and >> no exception is thrown. >> >> So, this might really be considered a bug of WriteFiles (and thus >> FileIO). But I'd love to hear other interpretations. >> >> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote: >> >>> I'm building a pipeline that streams from Pubsub and writes to files. >>> I'm using FileIO's dynamic destinations to place elements into different >>> directories according to date and I really don't care about ordering of >>> elements beyond the date buckets. >>> >>> So, I think GlobalWindows is appropriate in this case, even though the >>> input is unbounded. Is it possible to use GlobalWindows but set a trigger >>> based on number of elements and/or processing time so that beam actually >>> writes out files periodically? >>> >>> I tried the following: >>> >>> Window.into(new GlobalWindows()) >>> .triggering(Repeatedly.forever(AfterFirst.of( >>> AfterPane.elementCountAtLeast(10000), >>> >>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>> .discardingFiredPanes() >>> >>> But it raises an exception about incompatible triggers: >>> >>> Inputs to Flatten had incompatible triggers: >>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>> >>> I believe that what's happening is that FileIO with explicit numShards >>> (required in the case of unbounded input) is forcing a GroupByKey, which >>> activates continuation triggers that are incompatible with my stated >>> triggers. It's internals of WriteFiles that's trying to flatten the >>> incompatible PCollections together. >>> >>> >>>