It looks like this is a problematic interaction between the 2-layer join
and the (unfortunately complicated) continuation triggering. Specifically,
the triggering of Join(A, B) has the continuation trigger, so it isn't
possible to join that with C.

Instead of trying to do Join(Join(A, B), C), consider using a CoGroupByKey.
This will allow you to join all three input collections at the same time,
which should have two benefits. First, it will work since you won't be
trying to merge a continuation trigger with the original trigger. Second,
it should be more efficient, because you are performing a single, three-way
join instead of two, two-way joins.

https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
for
more information.

-- Ben

On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]> wrote:

> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>
> So there are two scenarios that I've been trying out but operation is
> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
> In scenario 1 I define three PCollections with global widnows and exactly
> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
> i get the exception.
> . Here is the code snippet where the window and triggers are the same for
> all three PCollections using global windows:
>
> Pipeline pipeline = Pipeline.create(options);
>
>  Trigger trigger1 =
>          AfterProcessingTime
>                  .pastFirstElementInPane()
>                  .plusDelayOf(Duration.standardSeconds(10))
>          ;
>  /**PARSE CUSTOMER*/
>  PCollection<Customer3> customerInput = pipeline
>
>          .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(customerTopic))
>
>
>                  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>
>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>  .apply(Window.<Customer3> into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>      trigger1 ))
>          .accumulatingFiredPanes())
>          ;
>  /**PARSE POLICY*/
>  PCollection<Policy2> policyInput = pipeline
>          .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(policyTopic))
>                  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  // .withWatermarkFn(new AddWatermarkFn())
>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>          //.apply(ParseJsons.of(Customer.class));
>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>         .apply(Window.<Policy2> into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>                  trigger1))
>                  .accumulatingFiredPanes())
>          ;
> /**PARSE CLAIM**/
>
>  PCollection<Claim2> claimInput = pipeline
>          .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(claimTopic))
>                  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>
>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>                  .accumulatingFiredPanes())
>          ;
>
>  /**CUSTOMER  ********/
>  PCollection<KV<Integer,String>> all_customers = customerInput
>          .apply(new ExtractAndMapCustomerKey())
>          ;
>  /***POLICY********/
>  PCollection<KV<Integer,String>> all_policies = policyInput
>          .apply(new ExtractAndMapPolicyKey())
>          ;
>  /***CLAIM*******/
>  PCollection<KV<Integer,String>> all_claims = claimInput
>          .apply(new ExtractAndMapClaimKey())
>          ;
>  /**JOIN**************/
>  /**This join works if I comment out the subsequent join**/
>
>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
> Join.innerJoin(all_customers,all_policies);
>
>  /**this join will cause IllegalStateException **/
>
>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
> joinedCustomersPoliciesAndClaims =
>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>
>
> The second scenario is using fixed windows. The logic is the same as
> above. Even in this case triggering is equal for all three collections like
> this
>
> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>         .withAllowedLateness(Duration.standardSeconds(1)));
>
> /**JOIN**************/
> /**NO ERROR this time **/
>
> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
> Join.innerJoin(all_customers,all_policies);
> PCollection<KV<Integer,KV<KV<String,String>,String>>> 
> joinedCustomersPoliciesAndClaims =
>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>
> this time I get no errors but I am not able to print the results in the
> console.
>
> So, I make another experiment and again define equal trigger for all three
> collections. I can print the output to the console for the first two
> PCollections but the second join again fails with illegalStateException.
> Triggering definition for all three collections:
>
> /**PARSE CLAIM**/
>
>  PCollection<Claim2> claimInput = pipeline
>
>          .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(claimTopic))
>                  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>
>          .apply(Window.<Claim2> 
> into(FixedWindows.of(Duration.standardSeconds(60)))
>                  .triggering(AfterWatermark.pastEndOfWindow()
>                          
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>                                  //early update frequency
>                                  .alignedTo(Duration.standardSeconds(10)))
>                          
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>                  .withAllowedLateness(Duration.standardMinutes(5))
>                  .accumulatingFiredPanes());
>
>
>
> In this cases I've added early and late firings which emits results from
> the pane but again throws exception on the second join.
>
> I know it's a lot of information to take in but basically if you have an
> example where you join three PCollections in global and in fixed windows
> with appropriate triggering, I'd be eternally grateful:)
> Or if you could explain how to do it, of course. thanks in advance.
>
> Best Regards
> Artur
>
>
>
>
>
> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]> wrote:
>
>> Hi Artur,
>>
>> When you join the PCollections, they will be flattened and go through a
>> GroupByKey together. Since the trigger governs when the GroupByKey can emit
>> output, the triggers have to be equal or the GroupByKey doesn't have a
>> clear guide as to when it should output. If you can make the triggering on
>> all the input collections equal, that will resolve this issue. If you still
>> need different triggering elsewhere, that is fine. You just need to make
>> them the same going in to the join.
>>
>> Kenn
>>
>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]> wrote:
>>
>>> Hello,
>>> You can try put PCollection after flatten into same global window with
>>> triggers as it was before flattening.
>>>
>>> Best regards
>>> Aleksandr Gortujev
>>>
>>>
>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
>>> [email protected]>:
>>>
>>> Hi,
>>> I am on second week of our PoC with Beam and I am really amazed by the
>>> capabilities of the framework and how well engineered it is.
>>>
>>> Amazed does not mean experienced so please bear with me.
>>>
>>> What  we try to achieve is to join several streams using windowing and
>>> triggers. And that is where I fear we hit the limitations  for what can be
>>> done.
>>>
>>> In case A we run in global windows and we are able to combine two
>>> unbounded PCollections but when I try to combine the results with third
>>> collection I get the exception below. I tried many diffrent trigger
>>> combinations, but can't make it work.
>>>
>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>>> Flatten had incompatible triggers:
>>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>> seconds))
>>>
>>> In case B I use fixed windows. Again, I can successfully  join two
>>> collections and print output in the console. When I add the third it runs
>>> without errors, but I am not able to materialize results in the console.
>>> Although I am able to print results of merge using Flatten so the error
>>> above is not longer an issue.
>>>
>>> Has anyone experience with joining three or more unbounded PCollections?
>>> What would be successful windowing, triggering strategy for global or fixed
>>> window respectively?
>>>
>>> Below code snippets from fixed windows case. Windows are defined in the
>>> same manner for all three collections, customer, claim and policy. The
>>> Join class I use comes from
>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>
>>>
>>> Would be really greateful if any of you would like to share your
>>> knowledge.
>>>
>>> Best Regard
>>> Artur
>>>
>>> PCollection<Claim2> claimInput = pipeline
>>>
>>>         .apply((KafkaIO.<String, String> 
>>> read().withTopics(ImmutableList.of(claimTopic))
>>>                 
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>                 .withValueDeserializer(StringDeserializer.class))
>>>                 .withoutMetadata())
>>>         .apply(Values.<String> create())
>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>         .apply(Window.<Claim2> 
>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>                 
>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>
>>>  /**JOIN**************/
>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>> Join.innerJoin(all_customers,all_policies);
>>> PCollectionList<KV<Integer,String>> collections = 
>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>> joinedCustomersPoliciesAndClaims =
>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>  //PCollectionList<KV<Integer,String>> collections = 
>>> PCollectionList.of(all_customers).and(all_policies);
>>>
>>> PCollection<KV<Integer,String>> merged=
>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>
>>>
>>>
>>
>

Reply via email to