Yes, indeed. It works just fine with equally define triggers in global window. Sorry about that and thank you sooooo much. I can now leav the valley of despair and continue on my learning path;)
Best Regards /Artur On Sat, Nov 4, 2017 at 10:40 PM, Aleksandr <aleksandr...@gmail.com> wrote: > Hello, > You are using for policyInput Repeatedly.forever, for customerInput > Repeatedly.forever and for claimInput your trigger is without > Repeatedly.forever. > > > Best regards > Aleksandr Gortujev > > > сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski <art...@gmail.com>: > >> And the error message: >> >> Exception in thread "main" java.lang.IllegalStateException: Inputs to >> Flatten had incompatible triggers: Repeatedly.forever( >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)), >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds) >> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten. >> java:124) >> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten. >> java:102) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) >> at org.apache.beam.sdk.values.PCollectionList.apply( >> PCollectionList.java:182) >> at org.apache.beam.sdk.transforms.join.CoGroupByKey. >> expand(CoGroupByKey.java:124) >> at org.apache.beam.sdk.transforms.join.CoGroupByKey. >> expand(CoGroupByKey.java:74) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) >> at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply( >> KeyedPCollectionTuple.java:106) >> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170) >> at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main( >> CustomerStreamPipelineGlobal.java:220) >> >> >> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <art...@gmail.com> wrote: >> >>> Sure, here is the url to my repo https://github.com/afuyo/ >>> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/ >>> CustomerStreamPipelineGlobal.java >>> >>> It is fairly simple and I do no grouping before. Just read from Kafka >>> and then join. I followed your advice and the 3 way join works for pipeline >>> with fixed windows. Both classes use the same join class. The logic is the >>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me >>> think it could be due to triggers. But I am ofcourse not sure:) >>> >>> /Artur >>> >>> Pipeline pipeline = Pipeline.create(options); >>> >>> Trigger trigger1 = >>> AfterProcessingTime >>> .pastFirstElementInPane() >>> .plusDelayOf(Duration.standardSeconds(10)) >>> ; >>> /**PARSE CUSTOMER*/ >>> PCollection<Customer3> customerInput = pipeline >>> >>> .apply((KafkaIO.<String, String> >>> read().withTopics(ImmutableList.of(customerTopic)) >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> .withoutMetadata()) >>> .apply(Values.<String> create()) >>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >>> .apply(Window.<Customer3> into(new >>> GlobalWindows()).triggering(Repeatedly.forever( >>> trigger1 )) >>> .accumulatingFiredPanes()) >>> ; >>> /**PARSE POLICY*/ >>> PCollection<Policy2> policyInput = pipeline >>> .apply((KafkaIO.<String, String> >>> read().withTopics(ImmutableList.of(policyTopic)) >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> // .withWatermarkFn(new AddWatermarkFn()) >>> //.withTimestampFn2(new AddCustomerTimestampFn()) >>> .withoutMetadata()) >>> .apply(Values.<String> create()) >>> //.apply(ParseJsons.of(Customer.class)); >>> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >>> .apply(Window.<Policy2> into(new >>> GlobalWindows()).triggering(Repeatedly.forever( >>> trigger1)) >>> .accumulatingFiredPanes()) >>> ; >>> /**PARSE CLAIM**/ >>> PCollection<Claim2> claimInput = pipeline >>> .apply((KafkaIO.<String, String> >>> read().withTopics(ImmutableList.of(claimTopic)) >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> .withoutMetadata()) >>> .apply(Values.<String> create()) >>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>> .apply(Window.<Claim2> into(new >>> GlobalWindows()).triggering(trigger1) >>> .accumulatingFiredPanes()) >>> ; >>> >>> /**CUSTOMER ********/ >>> PCollection<KV<Integer,String>> all_customers = customerInput >>> .apply(new ExtractAndMapCustomerKey()) >>> ; >>> /***POLICY********/ >>> PCollection<KV<Integer,String>> all_policies = policyInput >>> .apply(new ExtractAndMapPolicyKey()) >>> ; >>> /***CLAIM*******/ >>> PCollection<KV<Integer,String>> all_claims = claimInput >>> .apply(new ExtractAndMapClaimKey()) >>> ; >>> /**JOIN**************/ >>> /**This join works if I comment out the subsequent join**/ >>> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>> Join.innerJoin(all_customers,all_policies); >>> >>> /**This causes an exception**/ >>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>> Join.innerJoin3Way(all_customers,all_policies,all_claims); >>> >>> /**this join will cause IllegalStateException **/ >>> // PCollection<KV<Integer,KV<KV<String,String>,String>>> >>> joinedCustomersPoliciesAndClaims = >>> // Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>> >>> /**This will also cause an exception when used with 3 collections**/ >>> //PCollectionList<KV<Integer,String>> collections = >>> PCollectionList.of(all_customers).and(all_policies).and(all_claims); >>> //PCollection<KV<Integer,String>> merged= >>> collections.apply(Flatten.<KV<Integer,String>>pCollections()); >>> >>> And the 3 way join >>> >>> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way( >>> final PCollection<KV<K, V1>> leftCollection, >>> final PCollection<KV<K, V2>> rightCollection >>> ,final PCollection<KV<K, V3>> thirdCollection) >>> { >>> >>> final TupleTag<V1> v1Tuple = new TupleTag<>(); >>> final TupleTag<V2> v2Tuple = new TupleTag<>(); >>> final TupleTag<V3> v3Tuple = new TupleTag<>(); >>> >>> PCollection<KV<K, CoGbkResult>> coGbkResultCollection = >>> KeyedPCollectionTuple.of(v1Tuple, leftCollection) >>> .and(v2Tuple, rightCollection) >>> .and(v3Tuple,thirdCollection) >>> .apply(CoGroupByKey.<K>create()); >>> >>> System.out.println(coGbkResultCollection); >>> >>> return coGbkResultCollection.apply(ParDo.of( >>> new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { >>> >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> KV<K, CoGbkResult> e = c.element(); >>> >>> Iterable<V1> leftValuesIterable = >>> e.getValue().getAll(v1Tuple); >>> Iterable<V2> rightValuesIterable = >>> e.getValue().getAll(v2Tuple); >>> Iterable<V3> thirdValuesIterable = >>> e.getValue().getAll(v3Tuple); >>> >>> for(V3 thirdValue : thirdValuesIterable) >>> { >>> >>> } >>> >>> for (V1 leftValue : leftValuesIterable) { >>> for (V2 rightValue : rightValuesIterable) { >>> c.output(KV.of(e.getKey(), KV.of(leftValue, >>> rightValue))); >>> } >>> >>> } >>> } >>> })) >>> .setCoder(KvCoder.of(((KvCoder) >>> leftCollection.getCoder()).getKeyCoder(), >>> KvCoder.of(((KvCoder) >>> leftCollection.getCoder()).getValueCoder(), >>> ((KvCoder) >>> rightCollection.getCoder()).getValueCoder()))); >>> >>> >>> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <bchamb...@google.com> >>> wrote: >>> >>>> Can you share the program using global windows and make sure the >>>> exception is the same? Basically, this kind of problem has to do with >>>> whether you have applied a grouping operation already. The triggering (and >>>> sometimes windowing) before a grouping operation is different than after. >>>> So, if you are having problems applying a groupbykey somewhere and think >>>> the windowing and triggering should be the same, you need to look before >>>> that to see if one of the collections has been grouped but the others >>>> haven't. >>>> >>>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <art...@gmail.com> wrote: >>>> >>>>> Hej Ben, >>>>> thank you for your answer. I think I forgot to mention that the join >>>>> class already implements CoGroupByKey and comes from sdk extensions. I >>>>> haven now modified it slightly to do 3 way join(see below). It works for 3 >>>>> PCollections with fixed windows and provides the output this time, even >>>>> when using early and late triggers.That is great! >>>>> >>>>> But when I try to use it with global windows for all three >>>>> collections I get the same exception. >>>>> >>>>> Is it not possible to make a 3 way join in global window? The reason >>>>> why I want to use global window is that I want to have all the historic >>>>> records available in these collections. Not sure how that could be >>>>> achieved >>>>> using fixed windows. >>>>> >>>>> /Artur >>>>> >>>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> >>>>> innerJoin3Way( >>>>> final PCollection<KV<K, V1>> leftCollection, >>>>> final PCollection<KV<K, V2>> rightCollection >>>>> ,final PCollection<KV<K, V3>> thirdCollection) >>>>> { >>>>> final TupleTag<V1> v1Tuple = new TupleTag<>(); >>>>> final TupleTag<V2> v2Tuple = new TupleTag<>(); >>>>> final TupleTag<V3> v3Tuple = new TupleTag<>(); >>>>> >>>>> PCollection<KV<K, CoGbkResult>> coGbkResultCollection = >>>>> KeyedPCollectionTuple.of(v1Tuple, leftCollection) >>>>> .and(v2Tuple, rightCollection) >>>>> .and(v3Tuple,thirdCollection) >>>>> .apply(CoGroupByKey.<K>create()); >>>>> >>>>> >>>>> return coGbkResultCollection.apply(ParDo.of( >>>>> new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() { >>>>> >>>>> @ProcessElement >>>>> public void processElement(ProcessContext c) >>>>> >>>>> >>>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bchamb...@apache.org> >>>>> wrote: >>>>> >>>>>> It looks like this is a problematic interaction between the 2-layer >>>>>> join and the (unfortunately complicated) continuation triggering. >>>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, >>>>>> so >>>>>> it isn't possible to join that with C. >>>>>> >>>>>> Instead of trying to do Join(Join(A, B), C), consider using a >>>>>> CoGroupByKey. This will allow you to join all three input collections at >>>>>> the same time, which should have two benefits. First, it will work since >>>>>> you won't be trying to merge a continuation trigger with the original >>>>>> trigger. Second, it should be more efficient, because you are performing >>>>>> a >>>>>> single, three-way join instead of two, two-way joins. >>>>>> >>>>>> https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more >>>>>> information. >>>>>> >>>>>> -- Ben >>>>>> >>>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <art...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer. >>>>>>> >>>>>>> So there are two scenarios that I've been trying out but operation >>>>>>> is always the same, join setA+setB =>setAB, and then join >>>>>>> setAB+setC=>setABC. >>>>>>> In scenario 1 I define three PCollections with global widnows and >>>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try >>>>>>> to >>>>>>> join AB+C i get the exception. >>>>>>> . Here is the code snippet where the window and triggers are the >>>>>>> same for all three PCollections using global windows: >>>>>>> >>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>> >>>>>>> Trigger trigger1 = >>>>>>> AfterProcessingTime >>>>>>> .pastFirstElementInPane() >>>>>>> .plusDelayOf(Duration.standardSeconds(10)) >>>>>>> ; >>>>>>> /**PARSE CUSTOMER*/ >>>>>>> PCollection<Customer3> customerInput = pipeline >>>>>>> >>>>>>> .apply((KafkaIO.<String, String> >>>>>>> read().withTopics(ImmutableList.of(customerTopic)) >>>>>>> >>>>>>> >>>>>>> >>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>> .withoutMetadata()) >>>>>>> .apply(Values.<String> create()) >>>>>>> >>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >>>>>>> .apply(Window.<Customer3> into(new >>>>>>> GlobalWindows()).triggering(Repeatedly.forever( >>>>>>> trigger1 )) >>>>>>> .accumulatingFiredPanes()) >>>>>>> ; >>>>>>> /**PARSE POLICY*/ >>>>>>> PCollection<Policy2> policyInput = pipeline >>>>>>> .apply((KafkaIO.<String, String> >>>>>>> read().withTopics(ImmutableList.of(policyTopic)) >>>>>>> >>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>> // .withWatermarkFn(new AddWatermarkFn()) >>>>>>> //.withTimestampFn2(new AddCustomerTimestampFn()) >>>>>>> .withoutMetadata()) >>>>>>> .apply(Values.<String> create()) >>>>>>> //.apply(ParseJsons.of(Customer.class)); >>>>>>> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >>>>>>> .apply(Window.<Policy2> into(new >>>>>>> GlobalWindows()).triggering(Repeatedly.forever( >>>>>>> trigger1)) >>>>>>> .accumulatingFiredPanes()) >>>>>>> ; >>>>>>> /**PARSE CLAIM**/ >>>>>>> >>>>>>> PCollection<Claim2> claimInput = pipeline >>>>>>> .apply((KafkaIO.<String, String> >>>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>>> >>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>> .withoutMetadata()) >>>>>>> .apply(Values.<String> create()) >>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>>> >>>>>>> .apply(Window.<Claim2> into(new >>>>>>> GlobalWindows()).triggering(trigger1) >>>>>>> .accumulatingFiredPanes()) >>>>>>> ; >>>>>>> >>>>>>> /**CUSTOMER ********/ >>>>>>> PCollection<KV<Integer,String>> all_customers = customerInput >>>>>>> .apply(new ExtractAndMapCustomerKey()) >>>>>>> ; >>>>>>> /***POLICY********/ >>>>>>> PCollection<KV<Integer,String>> all_policies = policyInput >>>>>>> .apply(new ExtractAndMapPolicyKey()) >>>>>>> ; >>>>>>> /***CLAIM*******/ >>>>>>> PCollection<KV<Integer,String>> all_claims = claimInput >>>>>>> .apply(new ExtractAndMapClaimKey()) >>>>>>> ; >>>>>>> /**JOIN**************/ >>>>>>> /**This join works if I comment out the subsequent join**/ >>>>>>> >>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>>>> Join.innerJoin(all_customers,all_policies); >>>>>>> >>>>>>> /**this join will cause IllegalStateException **/ >>>>>>> >>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>>> joinedCustomersPoliciesAndClaims = >>>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>>> >>>>>>> >>>>>>> >>>>>>> The second scenario is using fixed windows. The logic is the same as >>>>>>> above. Even in this case triggering is equal for all three collections >>>>>>> like >>>>>>> this >>>>>>> >>>>>>> .apply(Window.<Claim2> >>>>>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>>>>> >>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>>>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>>>>> >>>>>>> /**JOIN**************/ >>>>>>> /**NO ERROR this time **/ >>>>>>> >>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>>>> Join.innerJoin(all_customers,all_policies); >>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>>> joinedCustomersPoliciesAndClaims = >>>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>>> >>>>>>> >>>>>>> this time I get no errors but I am not able to print the results in >>>>>>> the console. >>>>>>> >>>>>>> So, I make another experiment and again define equal trigger for all >>>>>>> three collections. I can print the output to the console for the first >>>>>>> two >>>>>>> PCollections but the second join again fails with illegalStateException. >>>>>>> Triggering definition for all three collections: >>>>>>> >>>>>>> /**PARSE CLAIM**/ >>>>>>> >>>>>>> PCollection<Claim2> claimInput = pipeline >>>>>>> >>>>>>> .apply((KafkaIO.<String, String> >>>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>>> >>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>> .withoutMetadata()) >>>>>>> .apply(Values.<String> create()) >>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>>> >>>>>>> .apply(Window.<Claim2> >>>>>>> into(FixedWindows.of(Duration.standardSeconds(60))) >>>>>>> .triggering(AfterWatermark.pastEndOfWindow() >>>>>>> >>>>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >>>>>>> //early update frequency >>>>>>> >>>>>>> .alignedTo(Duration.standardSeconds(10))) >>>>>>> >>>>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20)))) >>>>>>> .withAllowedLateness(Duration.standardMinutes(5)) >>>>>>> .accumulatingFiredPanes()); >>>>>>> >>>>>>> >>>>>>> >>>>>>> In this cases I've added early and late firings which emits results >>>>>>> from the pane but again throws exception on the second join. >>>>>>> >>>>>>> I know it's a lot of information to take in but basically if you >>>>>>> have an example where you join three PCollections in global and in fixed >>>>>>> windows with appropriate triggering, I'd be eternally grateful:) >>>>>>> Or if you could explain how to do it, of course. thanks in advance. >>>>>>> >>>>>>> Best Regards >>>>>>> Artur >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Artur, >>>>>>>> >>>>>>>> When you join the PCollections, they will be flattened and go >>>>>>>> through a GroupByKey together. Since the trigger governs when the >>>>>>>> GroupByKey can emit output, the triggers have to be equal or the >>>>>>>> GroupByKey >>>>>>>> doesn't have a clear guide as to when it should output. If you can >>>>>>>> make the >>>>>>>> triggering on all the input collections equal, that will resolve this >>>>>>>> issue. If you still need different triggering elsewhere, that is fine. >>>>>>>> You >>>>>>>> just need to make them the same going in to the join. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <aleksandr...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> You can try put PCollection after flatten into same global window >>>>>>>>> with triggers as it was before flattening. >>>>>>>>> >>>>>>>>> Best regards >>>>>>>>> Aleksandr Gortujev >>>>>>>>> >>>>>>>>> >>>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" < >>>>>>>>> art...@gmail.com>: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I am on second week of our PoC with Beam and I am really amazed by >>>>>>>>> the capabilities of the framework and how well engineered it is. >>>>>>>>> >>>>>>>>> Amazed does not mean experienced so please bear with me. >>>>>>>>> >>>>>>>>> What we try to achieve is to join several streams using windowing >>>>>>>>> and triggers. And that is where I fear we hit the limitations for >>>>>>>>> what can >>>>>>>>> be done. >>>>>>>>> >>>>>>>>> In case A we run in global windows and we are able to combine two >>>>>>>>> unbounded PCollections but when I try to combine the results with >>>>>>>>> third >>>>>>>>> collection I get the exception below. I tried many diffrent trigger >>>>>>>>> combinations, but can't make it work. >>>>>>>>> >>>>>>>>> Exception in thread "main" java.lang.IllegalStateException: >>>>>>>>> Inputs to Flatten had incompatible triggers: Repeatedly.forever( >>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()), >>>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>>>>>>> seconds)) >>>>>>>>> >>>>>>>>> In case B I use fixed windows. Again, I can successfully join two >>>>>>>>> collections and print output in the console. When I add the third it >>>>>>>>> runs >>>>>>>>> without errors, but I am not able to materialize results in the >>>>>>>>> console. >>>>>>>>> Although I am able to print results of merge using Flatten so the >>>>>>>>> error >>>>>>>>> above is not longer an issue. >>>>>>>>> >>>>>>>>> Has anyone experience with joining three or more unbounded >>>>>>>>> PCollections? What would be successful windowing, triggering strategy >>>>>>>>> for >>>>>>>>> global or fixed window respectively? >>>>>>>>> >>>>>>>>> Below code snippets from fixed windows case. Windows are defined >>>>>>>>> in the same manner for all three collections, customer, claim and >>>>>>>>> policy. The >>>>>>>>> Join class I use comes from https://github.com/ >>>>>>>>> apache/beam/blob/master/sdks/java/extensions/join-library/ >>>>>>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java >>>>>>>>> >>>>>>>>> >>>>>>>>> Would be really greateful if any of you would like to share your >>>>>>>>> knowledge. >>>>>>>>> >>>>>>>>> Best Regard >>>>>>>>> Artur >>>>>>>>> >>>>>>>>> PCollection<Claim2> claimInput = pipeline >>>>>>>>> >>>>>>>>> .apply((KafkaIO.<String, String> >>>>>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>>>>> >>>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>>>> .withoutMetadata()) >>>>>>>>> .apply(Values.<String> create()) >>>>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>>>>> .apply(Window.<Claim2> >>>>>>>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>>>>>>> >>>>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>>>>>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>>>>>>> >>>>>>>>> /**JOIN**************/ >>>>>>>>> PCollection<KV<Integer,KV<String,String>>> >>>>>>>>> joinedCustomersAndPolicies= >>>>>>>>> Join.innerJoin(all_customers,all_policies); >>>>>>>>> PCollectionList<KV<Integer,String>> collections = >>>>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims); >>>>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>>>>> joinedCustomersPoliciesAndClaims = >>>>>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>>>>> //PCollectionList<KV<Integer,String>> collections = >>>>>>>>> PCollectionList.of(all_customers).and(all_policies); >>>>>>>>> >>>>>>>>> PCollection<KV<Integer,String>> merged= >>>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections()); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> >>