Hej Ben,
thank you for your answer. I think I forgot to mention that the join class
already implements CoGroupByKey and comes from sdk extensions. I haven now
modified it slightly to do 3 way join(see below). It works for 3
PCollections with fixed windows and provides the output this time, even
when using early and late triggers.That is great!

But when I try to use it  with global windows for all three collections I
get the same exception.

Is it not possible to make a 3 way join in global window? The reason why I
want to use global window is that I want to have all the historic records
available in these collections. Not sure how that could be achieved using
fixed windows.

/Artur

public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
        final PCollection<KV<K, V1>> leftCollection,
        final PCollection<KV<K, V2>> rightCollection
        ,final PCollection<KV<K, V3>> thirdCollection)
{
    final TupleTag<V1> v1Tuple = new TupleTag<>();
    final TupleTag<V2> v2Tuple = new TupleTag<>();
    final TupleTag<V3> v3Tuple = new TupleTag<>();

    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
            KeyedPCollectionTuple.of(v1Tuple, leftCollection)
                    .and(v2Tuple, rightCollection)
                    .and(v3Tuple,thirdCollection)
                    .apply(CoGroupByKey.<K>create());


    return coGbkResultCollection.apply(ParDo.of(
            new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {

                @ProcessElement
                public void processElement(ProcessContext c)


On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bchamb...@apache.org> wrote:

> It looks like this is a problematic interaction between the 2-layer join
> and the (unfortunately complicated) continuation triggering. Specifically,
> the triggering of Join(A, B) has the continuation trigger, so it isn't
> possible to join that with C.
>
> Instead of trying to do Join(Join(A, B), C), consider using a
> CoGroupByKey. This will allow you to join all three input collections at
> the same time, which should have two benefits. First, it will work since
> you won't be trying to merge a continuation trigger with the original
> trigger. Second, it should be more efficient, because you are performing a
> single, three-way join instead of two, two-way joins.
>
> https://beam.apache.org/documentation/sdks/javadoc/2.
> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
> information.
>
> -- Ben
>
> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <art...@gmail.com> wrote:
>
>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>
>> So there are two scenarios that I've been trying out but operation is
>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>> In scenario 1 I define three PCollections with global widnows and exactly
>> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
>> i get the exception.
>> . Here is the code snippet where the window and triggers are the same for
>> all three PCollections using global windows:
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>>  Trigger trigger1 =
>>          AfterProcessingTime
>>                  .pastFirstElementInPane()
>>                  .plusDelayOf(Duration.standardSeconds(10))
>>          ;
>>  /**PARSE CUSTOMER*/
>>  PCollection<Customer3> customerInput = pipeline
>>
>>          .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(customerTopic))
>>
>>
>>                  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>
>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>  .apply(Window.<Customer3> into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>      trigger1 ))
>>          .accumulatingFiredPanes())
>>          ;
>>  /**PARSE POLICY*/
>>  PCollection<Policy2> policyInput = pipeline
>>          .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(policyTopic))
>>                  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  // .withWatermarkFn(new AddWatermarkFn())
>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>          //.apply(ParseJsons.of(Customer.class));
>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>         .apply(Window.<Policy2> into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>                  trigger1))
>>                  .accumulatingFiredPanes())
>>          ;
>> /**PARSE CLAIM**/
>>
>>  PCollection<Claim2> claimInput = pipeline
>>          .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(claimTopic))
>>                  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>
>>          .apply(Window.<Claim2> into(new 
>> GlobalWindows()).triggering(trigger1)
>>                  .accumulatingFiredPanes())
>>          ;
>>
>>  /**CUSTOMER  ********/
>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>          .apply(new ExtractAndMapCustomerKey())
>>          ;
>>  /***POLICY********/
>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>          .apply(new ExtractAndMapPolicyKey())
>>          ;
>>  /***CLAIM*******/
>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>          .apply(new ExtractAndMapClaimKey())
>>          ;
>>  /**JOIN**************/
>>  /**This join works if I comment out the subsequent join**/
>>
>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>> Join.innerJoin(all_customers,all_policies);
>>
>>  /**this join will cause IllegalStateException **/
>>
>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>> joinedCustomersPoliciesAndClaims =
>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>
>>
>> The second scenario is using fixed windows. The logic is the same as
>> above. Even in this case triggering is equal for all three collections like
>> this
>>
>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>         
>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>
>> /**JOIN**************/
>> /**NO ERROR this time **/
>>
>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>> Join.innerJoin(all_customers,all_policies);
>> PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>> joinedCustomersPoliciesAndClaims =
>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>
>> this time I get no errors but I am not able to print the results in the
>> console.
>>
>> So, I make another experiment and again define equal trigger for all
>> three collections. I can print the output to the console for the first two
>> PCollections but the second join again fails with illegalStateException.
>> Triggering definition for all three collections:
>>
>> /**PARSE CLAIM**/
>>
>>  PCollection<Claim2> claimInput = pipeline
>>
>>          .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(claimTopic))
>>                  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>
>>          .apply(Window.<Claim2> 
>> into(FixedWindows.of(Duration.standardSeconds(60)))
>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>                          
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>                                  //early update frequency
>>                                  .alignedTo(Duration.standardSeconds(10)))
>>                          
>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>                  .accumulatingFiredPanes());
>>
>>
>>
>> In this cases I've added early and late firings which emits results from
>> the pane but again throws exception on the second join.
>>
>> I know it's a lot of information to take in but basically if you have an
>> example where you join three PCollections in global and in fixed windows
>> with appropriate triggering, I'd be eternally grateful:)
>> Or if you could explain how to do it, of course. thanks in advance.
>>
>> Best Regards
>> Artur
>>
>>
>>
>>
>>
>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <k...@google.com> wrote:
>>
>>> Hi Artur,
>>>
>>> When you join the PCollections, they will be flattened and go through a
>>> GroupByKey together. Since the trigger governs when the GroupByKey can emit
>>> output, the triggers have to be equal or the GroupByKey doesn't have a
>>> clear guide as to when it should output. If you can make the triggering on
>>> all the input collections equal, that will resolve this issue. If you still
>>> need different triggering elsewhere, that is fine. You just need to make
>>> them the same going in to the join.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <aleksandr...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>> You can try put PCollection after flatten into same global window with
>>>> triggers as it was before flattening.
>>>>
>>>> Best regards
>>>> Aleksandr Gortujev
>>>>
>>>>
>>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
>>>> art...@gmail.com>:
>>>>
>>>> Hi,
>>>> I am on second week of our PoC with Beam and I am really amazed by the
>>>> capabilities of the framework and how well engineered it is.
>>>>
>>>> Amazed does not mean experienced so please bear with me.
>>>>
>>>> What  we try to achieve is to join several streams using windowing and
>>>> triggers. And that is where I fear we hit the limitations  for what can be
>>>> done.
>>>>
>>>> In case A we run in global windows and we are able to combine two
>>>> unbounded PCollections but when I try to combine the results with third
>>>> collection I get the exception below. I tried many diffrent trigger
>>>> combinations, but can't make it work.
>>>>
>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>>>> Flatten had incompatible triggers: Repeatedly.forever(
>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>> seconds))
>>>>
>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>> collections and print output in the console. When I add the third it runs
>>>> without errors, but I am not able to materialize results in the console.
>>>> Although I am able to print results of merge using Flatten so the error
>>>> above is not longer an issue.
>>>>
>>>> Has anyone experience with joining three or more unbounded
>>>> PCollections? What would be successful windowing, triggering strategy for
>>>> global or fixed window respectively?
>>>>
>>>> Below code snippets from fixed windows case. Windows are defined in the
>>>> same manner for all three collections, customer, claim and policy. The
>>>> Join class I use comes from https://github.com/
>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>
>>>>
>>>> Would be really greateful if any of you would like to share your
>>>> knowledge.
>>>>
>>>> Best Regard
>>>> Artur
>>>>
>>>> PCollection<Claim2> claimInput = pipeline
>>>>
>>>>         .apply((KafkaIO.<String, String> 
>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>                 
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>                 .withoutMetadata())
>>>>         .apply(Values.<String> create())
>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>         .apply(Window.<Claim2> 
>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>                 
>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>
>>>>  /**JOIN**************/
>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>> Join.innerJoin(all_customers,all_policies);
>>>> PCollectionList<KV<Integer,String>> collections = 
>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>> joinedCustomersPoliciesAndClaims =
>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>  //PCollectionList<KV<Integer,String>> collections = 
>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>
>>>> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
>>>> Integer,String>>pCollections());
>>>>
>>>>
>>>>
>>>
>>

Reply via email to