Hej Ben, thank you for your answer. I think I forgot to mention that the join class already implements CoGroupByKey and comes from sdk extensions. I haven now modified it slightly to do 3 way join(see below). It works for 3 PCollections with fixed windows and provides the output this time, even when using early and late triggers.That is great!
But when I try to use it with global windows for all three collections I get the same exception. Is it not possible to make a 3 way join in global window? The reason why I want to use global window is that I want to have all the historic records available in these collections. Not sure how that could be achieved using fixed windows. /Artur public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way( final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection ,final PCollection<KV<K, V3>> thirdCollection) { final TupleTag<V1> v1Tuple = new TupleTag<>(); final TupleTag<V2> v2Tuple = new TupleTag<>(); final TupleTag<V3> v3Tuple = new TupleTag<>(); PCollection<KV<K, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) .and(v3Tuple,thirdCollection) .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() { @ProcessElement public void processElement(ProcessContext c) On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bchamb...@apache.org> wrote: > It looks like this is a problematic interaction between the 2-layer join > and the (unfortunately complicated) continuation triggering. Specifically, > the triggering of Join(A, B) has the continuation trigger, so it isn't > possible to join that with C. > > Instead of trying to do Join(Join(A, B), C), consider using a > CoGroupByKey. This will allow you to join all three input collections at > the same time, which should have two benefits. First, it will work since > you won't be trying to merge a continuation trigger with the original > trigger. Second, it should be more efficient, because you are performing a > single, three-way join instead of two, two-way joins. > > https://beam.apache.org/documentation/sdks/javadoc/2. > 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more > information. > > -- Ben > > On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <art...@gmail.com> wrote: > >> Hi Kenneth and Aleksandr and thank you for your prompt answer. >> >> So there are two scenarios that I've been trying out but operation is >> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC. >> In scenario 1 I define three PCollections with global widnows and exactly >> the same triggers. Now I am able to join A+B but as soon I try to join AB+C >> i get the exception. >> . Here is the code snippet where the window and triggers are the same for >> all three PCollections using global windows: >> >> Pipeline pipeline = Pipeline.create(options); >> >> Trigger trigger1 = >> AfterProcessingTime >> .pastFirstElementInPane() >> .plusDelayOf(Duration.standardSeconds(10)) >> ; >> /**PARSE CUSTOMER*/ >> PCollection<Customer3> customerInput = pipeline >> >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(customerTopic)) >> >> >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> >> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >> .apply(Window.<Customer3> into(new >> GlobalWindows()).triggering(Repeatedly.forever( >> trigger1 )) >> .accumulatingFiredPanes()) >> ; >> /**PARSE POLICY*/ >> PCollection<Policy2> policyInput = pipeline >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(policyTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> // .withWatermarkFn(new AddWatermarkFn()) >> //.withTimestampFn2(new AddCustomerTimestampFn()) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> //.apply(ParseJsons.of(Customer.class)); >> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >> .apply(Window.<Policy2> into(new >> GlobalWindows()).triggering(Repeatedly.forever( >> trigger1)) >> .accumulatingFiredPanes()) >> ; >> /**PARSE CLAIM**/ >> >> PCollection<Claim2> claimInput = pipeline >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(claimTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >> >> .apply(Window.<Claim2> into(new >> GlobalWindows()).triggering(trigger1) >> .accumulatingFiredPanes()) >> ; >> >> /**CUSTOMER ********/ >> PCollection<KV<Integer,String>> all_customers = customerInput >> .apply(new ExtractAndMapCustomerKey()) >> ; >> /***POLICY********/ >> PCollection<KV<Integer,String>> all_policies = policyInput >> .apply(new ExtractAndMapPolicyKey()) >> ; >> /***CLAIM*******/ >> PCollection<KV<Integer,String>> all_claims = claimInput >> .apply(new ExtractAndMapClaimKey()) >> ; >> /**JOIN**************/ >> /**This join works if I comment out the subsequent join**/ >> >> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >> Join.innerJoin(all_customers,all_policies); >> >> /**this join will cause IllegalStateException **/ >> >> PCollection<KV<Integer,KV<KV<String,String>,String>>> >> joinedCustomersPoliciesAndClaims = >> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >> >> >> >> The second scenario is using fixed windows. The logic is the same as >> above. Even in this case triggering is equal for all three collections like >> this >> >> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100))) >> >> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >> .withAllowedLateness(Duration.standardSeconds(1))); >> >> /**JOIN**************/ >> /**NO ERROR this time **/ >> >> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >> Join.innerJoin(all_customers,all_policies); >> PCollection<KV<Integer,KV<KV<String,String>,String>>> >> joinedCustomersPoliciesAndClaims = >> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >> >> >> this time I get no errors but I am not able to print the results in the >> console. >> >> So, I make another experiment and again define equal trigger for all >> three collections. I can print the output to the console for the first two >> PCollections but the second join again fails with illegalStateException. >> Triggering definition for all three collections: >> >> /**PARSE CLAIM**/ >> >> PCollection<Claim2> claimInput = pipeline >> >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(claimTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >> >> .apply(Window.<Claim2> >> into(FixedWindows.of(Duration.standardSeconds(60))) >> .triggering(AfterWatermark.pastEndOfWindow() >> >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >> //early update frequency >> .alignedTo(Duration.standardSeconds(10))) >> >> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20)))) >> .withAllowedLateness(Duration.standardMinutes(5)) >> .accumulatingFiredPanes()); >> >> >> >> In this cases I've added early and late firings which emits results from >> the pane but again throws exception on the second join. >> >> I know it's a lot of information to take in but basically if you have an >> example where you join three PCollections in global and in fixed windows >> with appropriate triggering, I'd be eternally grateful:) >> Or if you could explain how to do it, of course. thanks in advance. >> >> Best Regards >> Artur >> >> >> >> >> >> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <k...@google.com> wrote: >> >>> Hi Artur, >>> >>> When you join the PCollections, they will be flattened and go through a >>> GroupByKey together. Since the trigger governs when the GroupByKey can emit >>> output, the triggers have to be equal or the GroupByKey doesn't have a >>> clear guide as to when it should output. If you can make the triggering on >>> all the input collections equal, that will resolve this issue. If you still >>> need different triggering elsewhere, that is fine. You just need to make >>> them the same going in to the join. >>> >>> Kenn >>> >>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <aleksandr...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> You can try put PCollection after flatten into same global window with >>>> triggers as it was before flattening. >>>> >>>> Best regards >>>> Aleksandr Gortujev >>>> >>>> >>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" < >>>> art...@gmail.com>: >>>> >>>> Hi, >>>> I am on second week of our PoC with Beam and I am really amazed by the >>>> capabilities of the framework and how well engineered it is. >>>> >>>> Amazed does not mean experienced so please bear with me. >>>> >>>> What we try to achieve is to join several streams using windowing and >>>> triggers. And that is where I fear we hit the limitations for what can be >>>> done. >>>> >>>> In case A we run in global windows and we are able to combine two >>>> unbounded PCollections but when I try to combine the results with third >>>> collection I get the exception below. I tried many diffrent trigger >>>> combinations, but can't make it work. >>>> >>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to >>>> Flatten had incompatible triggers: Repeatedly.forever( >>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()), >>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>> seconds)) >>>> >>>> In case B I use fixed windows. Again, I can successfully join two >>>> collections and print output in the console. When I add the third it runs >>>> without errors, but I am not able to materialize results in the console. >>>> Although I am able to print results of merge using Flatten so the error >>>> above is not longer an issue. >>>> >>>> Has anyone experience with joining three or more unbounded >>>> PCollections? What would be successful windowing, triggering strategy for >>>> global or fixed window respectively? >>>> >>>> Below code snippets from fixed windows case. Windows are defined in the >>>> same manner for all three collections, customer, claim and policy. The >>>> Join class I use comes from https://github.com/ >>>> apache/beam/blob/master/sdks/java/extensions/join-library/ >>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java >>>> >>>> >>>> Would be really greateful if any of you would like to share your >>>> knowledge. >>>> >>>> Best Regard >>>> Artur >>>> >>>> PCollection<Claim2> claimInput = pipeline >>>> >>>> .apply((KafkaIO.<String, String> >>>> read().withTopics(ImmutableList.of(claimTopic)) >>>> >>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>> .withKeyDeserializer(StringDeserializer.class) >>>> .withValueDeserializer(StringDeserializer.class)) >>>> .withoutMetadata()) >>>> .apply(Values.<String> create()) >>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>> .apply(Window.<Claim2> >>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>> >>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>> >>>> /**JOIN**************/ >>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>> Join.innerJoin(all_customers,all_policies); >>>> PCollectionList<KV<Integer,String>> collections = >>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims); >>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>> joinedCustomersPoliciesAndClaims = >>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>> //PCollectionList<KV<Integer,String>> collections = >>>> PCollectionList.of(all_customers).and(all_policies); >>>> >>>> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV< >>>> Integer,String>>pCollections()); >>>> >>>> >>>> >>> >>