Hi Artur,

When you join the PCollections, they will be flattened and go through a
GroupByKey together. Since the trigger governs when the GroupByKey can emit
output, the triggers have to be equal or the GroupByKey doesn't have a
clear guide as to when it should output. If you can make the triggering on
all the input collections equal, that will resolve this issue. If you still
need different triggering elsewhere, that is fine. You just need to make
them the same going in to the join.

Kenn

On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]> wrote:

> Hello,
> You can try put PCollection after flatten into same global window with
> triggers as it was before flattening.
>
> Best regards
> Aleksandr Gortujev
>
>
> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
> [email protected]>:
>
> Hi,
> I am on second week of our PoC with Beam and I am really amazed by the
> capabilities of the framework and how well engineered it is.
>
> Amazed does not mean experienced so please bear with me.
>
> What  we try to achieve is to join several streams using windowing and
> triggers. And that is where I fear we hit the limitations  for what can be
> done.
>
> In case A we run in global windows and we are able to combine two
> unbounded PCollections but when I try to combine the results with third
> collection I get the exception below. I tried many diffrent trigger
> combinations, but can't make it work.
>
> Exception in thread "main" java.lang.IllegalStateException: Inputs to
> Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr
> onizedProcessingTime.pastFirstElementInPane()),
> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds))
>
> In case B I use fixed windows. Again, I can successfully  join two
> collections and print output in the console. When I add the third it runs
> without errors, but I am not able to materialize results in the console.
> Although I am able to print results of merge using Flatten so the error
> above is not longer an issue.
>
> Has anyone experience with joining three or more unbounded PCollections?
> What would be successful windowing, triggering strategy for global or fixed
> window respectively?
>
> Below code snippets from fixed windows case. Windows are defined in the
> same manner for all three collections, customer, claim and policy. The
> Join class I use comes from https://github.com/apache
> /beam/blob/master/sdks/java/extensions/join-library/src/main
> /java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>
>
> Would be really greateful if any of you would like to share your
> knowledge.
>
> Best Regard
> Artur
>
> PCollection<Claim2> claimInput = pipeline
>
>         .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(claimTopic))
>                 
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class))
>                 .withoutMetadata())
>         .apply(Values.<String> create())
>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>         .apply(Window.<Claim2> 
> into(FixedWindows.of(Duration.standardSeconds(100)))
>                 
> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>                 .withAllowedLateness(Duration.standardSeconds(1)));
>
>  /**JOIN**************/
>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
> Join.innerJoin(all_customers,all_policies);
> PCollectionList<KV<Integer,String>> collections = 
> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
> joinedCustomersPoliciesAndClaims =
>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>  //PCollectionList<KV<Integer,String>> collections = 
> PCollectionList.of(all_customers).and(all_policies);
>
> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
> Integer,String>>pCollections());
>
>
>

Reply via email to