Yes, indeed. It works just fine with equally define triggers in global
window. Sorry about that and thank you sooooo much.
I can now leav the valley of despair and continue on my learning path;)

Best Regards
/Artur

On Sat, Nov 4, 2017 at 10:40 PM, Aleksandr <aleksandr...@gmail.com> wrote:

> Hello,
> You are using for policyInput Repeatedly.forever, for customerInput
> Repeatedly.forever and for claimInput your trigger is without 
> Repeatedly.forever.
>
>
> Best regards
> Aleksandr Gortujev
>
>
> сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski <art...@gmail.com>:
>
>> And the error message:
>>
>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>> Flatten had incompatible triggers: Repeatedly.forever(
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)),
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:124)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:102)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.values.PCollectionList.apply(
>> PCollectionList.java:182)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:124)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:74)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(
>> KeyedPCollectionTuple.java:106)
>> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
>> at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(
>> CustomerStreamPipelineGlobal.java:220)
>>
>>
>> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <art...@gmail.com> wrote:
>>
>>> Sure, here is the url to my repo https://github.com/afuyo/
>>> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
>>> CustomerStreamPipelineGlobal.java
>>>
>>> It is fairly simple and I do no grouping before. Just read from Kafka
>>> and then join. I followed your advice and the 3 way join works for pipeline
>>> with fixed windows. Both classes use the same join class. The logic is the
>>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
>>> think it could be due to triggers. But I am ofcourse not sure:)
>>>
>>> /Artur
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>   Trigger trigger1 =
>>>           AfterProcessingTime
>>>                   .pastFirstElementInPane()
>>>                   .plusDelayOf(Duration.standardSeconds(10))
>>>           ;
>>>   /**PARSE CUSTOMER*/
>>>   PCollection<Customer3> customerInput = pipeline
>>>
>>>           .apply((KafkaIO.<String, String> 
>>> read().withTopics(ImmutableList.of(customerTopic))
>>>                   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                   .withKeyDeserializer(StringDeserializer.class)
>>>                   .withValueDeserializer(StringDeserializer.class))
>>>                   .withoutMetadata())
>>>           .apply(Values.<String> create())
>>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>   .apply(Window.<Customer3> into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>       trigger1 ))
>>>           .accumulatingFiredPanes())
>>>           ;
>>>   /**PARSE POLICY*/
>>>   PCollection<Policy2> policyInput = pipeline
>>>           .apply((KafkaIO.<String, String> 
>>> read().withTopics(ImmutableList.of(policyTopic))
>>>                   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                   .withKeyDeserializer(StringDeserializer.class)
>>>                   .withValueDeserializer(StringDeserializer.class))
>>>                   // .withWatermarkFn(new AddWatermarkFn())
>>>                   //.withTimestampFn2(new AddCustomerTimestampFn())
>>>                   .withoutMetadata())
>>>           .apply(Values.<String> create())
>>>           //.apply(ParseJsons.of(Customer.class));
>>>           .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>          .apply(Window.<Policy2> into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>                   trigger1))
>>>                   .accumulatingFiredPanes())
>>>           ;
>>>  /**PARSE CLAIM**/
>>>   PCollection<Claim2> claimInput = pipeline
>>>           .apply((KafkaIO.<String, String> 
>>> read().withTopics(ImmutableList.of(claimTopic))
>>>                   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                   .withKeyDeserializer(StringDeserializer.class)
>>>                   .withValueDeserializer(StringDeserializer.class))
>>>                   .withoutMetadata())
>>>           .apply(Values.<String> create())
>>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>           .apply(Window.<Claim2> into(new 
>>> GlobalWindows()).triggering(trigger1)
>>>                   .accumulatingFiredPanes())
>>>           ;
>>>
>>>   /**CUSTOMER  ********/
>>>   PCollection<KV<Integer,String>> all_customers = customerInput
>>>           .apply(new ExtractAndMapCustomerKey())
>>>           ;
>>>   /***POLICY********/
>>>   PCollection<KV<Integer,String>> all_policies = policyInput
>>>           .apply(new ExtractAndMapPolicyKey())
>>>           ;
>>>   /***CLAIM*******/
>>>   PCollection<KV<Integer,String>> all_claims = claimInput
>>>           .apply(new ExtractAndMapClaimKey())
>>>           ;
>>>   /**JOIN**************/
>>>   /**This join works if I comment out the subsequent join**/
>>> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>> Join.innerJoin(all_customers,all_policies);
>>>
>>> /**This causes an exception**/
>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>> Join.innerJoin3Way(all_customers,all_policies,all_claims);
>>>
>>>   /**this join will cause IllegalStateException **/
>>>  // PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>> joinedCustomersPoliciesAndClaims =
>>>    // Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>
>>>    /**This will also cause an exception when used with 3 collections**/
>>>   //PCollectionList<KV<Integer,String>> collections = 
>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>   //PCollection<KV<Integer,String>> merged= 
>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>
>>> And the 3 way join
>>>
>>> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
>>>            final PCollection<KV<K, V1>> leftCollection,
>>>            final PCollection<KV<K, V2>> rightCollection
>>>            ,final PCollection<KV<K, V3>> thirdCollection)
>>> {
>>>
>>>        final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>        final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>        final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>
>>>        PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>                KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>                        .and(v2Tuple, rightCollection)
>>>                        .and(v3Tuple,thirdCollection)
>>>                        .apply(CoGroupByKey.<K>create());
>>>
>>>        System.out.println(coGbkResultCollection);
>>>
>>>        return coGbkResultCollection.apply(ParDo.of(
>>>                new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
>>>
>>>                    @ProcessElement
>>>                    public void processElement(ProcessContext c) {
>>>                        KV<K, CoGbkResult> e = c.element();
>>>
>>>                        Iterable<V1> leftValuesIterable = 
>>> e.getValue().getAll(v1Tuple);
>>>                        Iterable<V2> rightValuesIterable = 
>>> e.getValue().getAll(v2Tuple);
>>>                        Iterable<V3> thirdValuesIterable = 
>>> e.getValue().getAll(v3Tuple);
>>>
>>>                        for(V3 thirdValue : thirdValuesIterable)
>>>                        {
>>>
>>>                        }
>>>
>>>                        for (V1 leftValue : leftValuesIterable) {
>>>                            for (V2 rightValue : rightValuesIterable) {
>>>                                c.output(KV.of(e.getKey(), KV.of(leftValue, 
>>> rightValue)));
>>>                            }
>>>
>>>                        }
>>>                    }
>>>                }))
>>>                .setCoder(KvCoder.of(((KvCoder) 
>>> leftCollection.getCoder()).getKeyCoder(),
>>>                        KvCoder.of(((KvCoder) 
>>> leftCollection.getCoder()).getValueCoder(),
>>>                                ((KvCoder) 
>>> rightCollection.getCoder()).getValueCoder())));
>>>
>>>
>>> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <bchamb...@google.com>
>>> wrote:
>>>
>>>> Can you share the program using global windows and make sure the
>>>> exception is the same? Basically, this kind of problem has to do with
>>>> whether you have applied a grouping operation already. The triggering (and
>>>> sometimes windowing) before a grouping operation is different than after.
>>>> So, if you are having problems applying a groupbykey somewhere and think
>>>> the windowing and triggering should be the same, you need to look before
>>>> that to see if one of the collections has been grouped but the others
>>>> haven't.
>>>>
>>>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <art...@gmail.com> wrote:
>>>>
>>>>> Hej Ben,
>>>>> thank you for your answer. I think I forgot to mention that the join
>>>>> class already implements CoGroupByKey and comes from sdk extensions. I
>>>>> haven now modified it slightly to do 3 way join(see below). It works for 3
>>>>> PCollections with fixed windows and provides the output this time, even
>>>>> when using early and late triggers.That is great!
>>>>>
>>>>> But when I try to use it  with global windows for all three
>>>>> collections I get the same exception.
>>>>>
>>>>> Is it not possible to make a 3 way join in global window? The reason
>>>>> why I want to use global window is that I want to have all the historic
>>>>> records available in these collections. Not sure how that could be 
>>>>> achieved
>>>>> using fixed windows.
>>>>>
>>>>> /Artur
>>>>>
>>>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> 
>>>>> innerJoin3Way(
>>>>>         final PCollection<KV<K, V1>> leftCollection,
>>>>>         final PCollection<KV<K, V2>> rightCollection
>>>>>         ,final PCollection<KV<K, V3>> thirdCollection)
>>>>> {
>>>>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>>>
>>>>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>>>                     .and(v2Tuple, rightCollection)
>>>>>                     .and(v3Tuple,thirdCollection)
>>>>>                     .apply(CoGroupByKey.<K>create());
>>>>>
>>>>>
>>>>>     return coGbkResultCollection.apply(ParDo.of(
>>>>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>>>>
>>>>>                 @ProcessElement
>>>>>                 public void processElement(ProcessContext c)
>>>>>
>>>>>
>>>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bchamb...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> It looks like this is a problematic interaction between the 2-layer
>>>>>> join and the (unfortunately complicated) continuation triggering.
>>>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, 
>>>>>> so
>>>>>> it isn't possible to join that with C.
>>>>>>
>>>>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>>>>> CoGroupByKey. This will allow you to join all three input collections at
>>>>>> the same time, which should have two benefits. First, it will work since
>>>>>> you won't be trying to merge a continuation trigger with the original
>>>>>> trigger. Second, it should be more efficient, because you are performing 
>>>>>> a
>>>>>> single, three-way join instead of two, two-way joins.
>>>>>>
>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>>>>> information.
>>>>>>
>>>>>> -- Ben
>>>>>>
>>>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <art...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>>>>
>>>>>>> So there are two scenarios that I've been trying out but operation
>>>>>>> is always the same, join setA+setB =>setAB, and then join
>>>>>>> setAB+setC=>setABC.
>>>>>>> In scenario 1 I define three PCollections with global widnows and
>>>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try 
>>>>>>> to
>>>>>>> join AB+C i get the exception.
>>>>>>> . Here is the code snippet where the window and triggers are the
>>>>>>> same for all three PCollections using global windows:
>>>>>>>
>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>
>>>>>>>  Trigger trigger1 =
>>>>>>>          AfterProcessingTime
>>>>>>>                  .pastFirstElementInPane()
>>>>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>>>>          ;
>>>>>>>  /**PARSE CUSTOMER*/
>>>>>>>  PCollection<Customer3> customerInput = pipeline
>>>>>>>
>>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>>> read().withTopics(ImmutableList.of(customerTopic))
>>>>>>>
>>>>>>>
>>>>>>>                  
>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>
>>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>>>>  .apply(Window.<Customer3> into(new 
>>>>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>>      trigger1 ))
>>>>>>>          .accumulatingFiredPanes())
>>>>>>>          ;
>>>>>>>  /**PARSE POLICY*/
>>>>>>>  PCollection<Policy2> policyInput = pipeline
>>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>>> read().withTopics(ImmutableList.of(policyTopic))
>>>>>>>                  
>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>>>>         .apply(Window.<Policy2> into(new 
>>>>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>>                  trigger1))
>>>>>>>                  .accumulatingFiredPanes())
>>>>>>>          ;
>>>>>>> /**PARSE CLAIM**/
>>>>>>>
>>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>                  
>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>
>>>>>>>          .apply(Window.<Claim2> into(new 
>>>>>>> GlobalWindows()).triggering(trigger1)
>>>>>>>                  .accumulatingFiredPanes())
>>>>>>>          ;
>>>>>>>
>>>>>>>  /**CUSTOMER  ********/
>>>>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>>>>          ;
>>>>>>>  /***POLICY********/
>>>>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>>>>          ;
>>>>>>>  /***CLAIM*******/
>>>>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>>>>          .apply(new ExtractAndMapClaimKey())
>>>>>>>          ;
>>>>>>>  /**JOIN**************/
>>>>>>>  /**This join works if I comment out the subsequent join**/
>>>>>>>
>>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>>>
>>>>>>>  /**this join will cause IllegalStateException **/
>>>>>>>
>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The second scenario is using fixed windows. The logic is the same as
>>>>>>> above. Even in this case triggering is equal for all three collections 
>>>>>>> like
>>>>>>> this
>>>>>>>
>>>>>>> .apply(Window.<Claim2> 
>>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>         
>>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>
>>>>>>> /**JOIN**************/
>>>>>>> /**NO ERROR this time **/
>>>>>>>
>>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>
>>>>>>>
>>>>>>> this time I get no errors but I am not able to print the results in
>>>>>>> the console.
>>>>>>>
>>>>>>> So, I make another experiment and again define equal trigger for all
>>>>>>> three collections. I can print the output to the console for the first 
>>>>>>> two
>>>>>>> PCollections but the second join again fails with illegalStateException.
>>>>>>> Triggering definition for all three collections:
>>>>>>>
>>>>>>> /**PARSE CLAIM**/
>>>>>>>
>>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>>
>>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>                  
>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>
>>>>>>>          .apply(Window.<Claim2> 
>>>>>>> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>>>>                          
>>>>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>>>>                                  //early update frequency
>>>>>>>                                  
>>>>>>> .alignedTo(Duration.standardSeconds(10)))
>>>>>>>                          
>>>>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>>>>                  .accumulatingFiredPanes());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> In this cases I've added early and late firings which emits results
>>>>>>> from the pane but again throws exception on the second join.
>>>>>>>
>>>>>>> I know it's a lot of information to take in but basically if you
>>>>>>> have an example where you join three PCollections in global and in fixed
>>>>>>> windows with appropriate triggering, I'd be eternally grateful:)
>>>>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>>>>
>>>>>>> Best Regards
>>>>>>> Artur
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <k...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Artur,
>>>>>>>>
>>>>>>>> When you join the PCollections, they will be flattened and go
>>>>>>>> through a GroupByKey together. Since the trigger governs when the
>>>>>>>> GroupByKey can emit output, the triggers have to be equal or the 
>>>>>>>> GroupByKey
>>>>>>>> doesn't have a clear guide as to when it should output. If you can 
>>>>>>>> make the
>>>>>>>> triggering on all the input collections equal, that will resolve this
>>>>>>>> issue. If you still need different triggering elsewhere, that is fine. 
>>>>>>>> You
>>>>>>>> just need to make them the same going in to the join.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <aleksandr...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> You can try put PCollection after flatten into same global window
>>>>>>>>> with triggers as it was before flattening.
>>>>>>>>>
>>>>>>>>> Best regards
>>>>>>>>> Aleksandr Gortujev
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>>>>>> art...@gmail.com>:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>>>>
>>>>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>>>>
>>>>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>>>>> and triggers. And that is where I fear we hit the limitations  for 
>>>>>>>>> what can
>>>>>>>>> be done.
>>>>>>>>>
>>>>>>>>> In case A we run in global windows and we are able to combine two
>>>>>>>>> unbounded PCollections but when I try to combine the results with 
>>>>>>>>> third
>>>>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>>>>> combinations, but can't make it work.
>>>>>>>>>
>>>>>>>>> Exception in thread "main" java.lang.IllegalStateException:
>>>>>>>>> Inputs to Flatten had incompatible triggers: Repeatedly.forever(
>>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>>> seconds))
>>>>>>>>>
>>>>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>>>>> collections and print output in the console. When I add the third it 
>>>>>>>>> runs
>>>>>>>>> without errors, but I am not able to materialize results in the 
>>>>>>>>> console.
>>>>>>>>> Although I am able to print results of merge using Flatten so the 
>>>>>>>>> error
>>>>>>>>> above is not longer an issue.
>>>>>>>>>
>>>>>>>>> Has anyone experience with joining three or more unbounded
>>>>>>>>> PCollections? What would be successful windowing, triggering strategy 
>>>>>>>>> for
>>>>>>>>> global or fixed window respectively?
>>>>>>>>>
>>>>>>>>> Below code snippets from fixed windows case. Windows are defined
>>>>>>>>> in the same manner for all three collections, customer, claim and 
>>>>>>>>> policy. The
>>>>>>>>> Join class I use comes from https://github.com/
>>>>>>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>>>>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Would be really greateful if any of you would like to share your
>>>>>>>>> knowledge.
>>>>>>>>>
>>>>>>>>> Best Regard
>>>>>>>>> Artur
>>>>>>>>>
>>>>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>>>>
>>>>>>>>>         .apply((KafkaIO.<String, String> 
>>>>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>>>                 
>>>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>>                 .withoutMetadata())
>>>>>>>>>         .apply(Values.<String> create())
>>>>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>>>         .apply(Window.<Claim2> 
>>>>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>>>                 
>>>>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>>>
>>>>>>>>>  /**JOIN**************/
>>>>>>>>>  PCollection<KV<Integer,KV<String,String>>> 
>>>>>>>>> joinedCustomersAndPolicies= 
>>>>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>>>>> PCollectionList<KV<Integer,String>> collections = 
>>>>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>>>  //PCollectionList<KV<Integer,String>> collections = 
>>>>>>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>>>>>>
>>>>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>

Reply via email to