Thanks for the response, Chamikara. I filed
https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
around the problem in my case by not using a ValueProvider for numShards.

On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> I'm not to familiar about the exact underlying issue here but writing
> unbounded input to files when using GlobalWindows for unsharded output is a
> valid usecase so sounds like a bug. Feel free to create a JIRA.
>
> - Cham
>
> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>
>> I've read more deeply into the WriteFiles code and I'm understanding now
>> that the exception is due to WriteFiles' attempt to handle unsharded input.
>> In that case, it creates a sharded and unsharded collection; the first goes
>> through one GroupByKey while the other goes through 2. These two
>> collections are then flattened together and they have incompatible triggers
>> due to the double-grouped collection using a continuation trigger.
>>
>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>> switch to hard coding an integer rather than passing a ValueProvider,
>> WriteFiles uses a different code path that doesn't flatten collections and
>> no exception is thrown.
>>
>> So, this might really be considered a bug of WriteFiles (and thus
>> FileIO). But I'd love to hear other interpretations.
>>
>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>>
>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>> I'm using FileIO's dynamic destinations to place elements into different
>>> directories according to date and I really don't care about ordering of
>>> elements beyond the date buckets.
>>>
>>> So, I think GlobalWindows is appropriate in this case, even though the
>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>>> based on number of elements and/or processing time so that beam actually
>>> writes out files periodically?
>>>
>>> I tried the following:
>>>
>>> Window.into(new GlobalWindows())
>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>     AfterPane.elementCountAtLeast(10000),
>>>
>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>   .discardingFiredPanes()
>>>
>>> But it raises an exception about incompatible triggers:
>>>
>>> Inputs to Flatten had incompatible triggers:
>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>
>>> I believe that what's happening is that FileIO with explicit numShards
>>> (required in the case of unbounded input) is forcing a GroupByKey, which
>>> activates continuation triggers that are incompatible with my stated
>>> triggers. It's internals of WriteFiles that's trying to flatten the
>>> incompatible PCollections together.
>>>
>>>
>>>

Reply via email to