Hej Ben,
thank you for your answer. I think I forgot to mention that the join class
already implements CoGroupByKey and comes from sdk extensions. I haven now
modified it slightly to do 3 way join(see below). It works for 3
PCollections with fixed windows and provides the output this time, even
when using early and late triggers.That is great!
But when I try to use it with global windows for all three collections I
get the same exception.
Is it not possible to make a 3 way join in global window? The reason why I
want to use global window is that I want to have all the historic records
available in these collections. Not sure how that could be achieved using
fixed windows.
/Artur
public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
final PCollection<KV<K, V1>> leftCollection,
final PCollection<KV<K, V2>> rightCollection
,final PCollection<KV<K, V3>> thirdCollection)
{
final TupleTag<V1> v1Tuple = new TupleTag<>();
final TupleTag<V2> v2Tuple = new TupleTag<>();
final TupleTag<V3> v3Tuple = new TupleTag<>();
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(v1Tuple, leftCollection)
.and(v2Tuple, rightCollection)
.and(v3Tuple,thirdCollection)
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
@ProcessElement
public void processElement(ProcessContext c)
On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]> wrote:
> It looks like this is a problematic interaction between the 2-layer join
> and the (unfortunately complicated) continuation triggering. Specifically,
> the triggering of Join(A, B) has the continuation trigger, so it isn't
> possible to join that with C.
>
> Instead of trying to do Join(Join(A, B), C), consider using a
> CoGroupByKey. This will allow you to join all three input collections at
> the same time, which should have two benefits. First, it will work since
> you won't be trying to merge a continuation trigger with the original
> trigger. Second, it should be more efficient, because you are performing a
> single, three-way join instead of two, two-way joins.
>
> https://beam.apache.org/documentation/sdks/javadoc/2.
> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
> information.
>
> -- Ben
>
> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]> wrote:
>
>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>
>> So there are two scenarios that I've been trying out but operation is
>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>> In scenario 1 I define three PCollections with global widnows and exactly
>> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
>> i get the exception.
>> . Here is the code snippet where the window and triggers are the same for
>> all three PCollections using global windows:
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>> Trigger trigger1 =
>> AfterProcessingTime
>> .pastFirstElementInPane()
>> .plusDelayOf(Duration.standardSeconds(10))
>> ;
>> /**PARSE CUSTOMER*/
>> PCollection<Customer3> customerInput = pipeline
>>
>> .apply((KafkaIO.<String, String>
>> read().withTopics(ImmutableList.of(customerTopic))
>>
>>
>>
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(StringDeserializer.class))
>> .withoutMetadata())
>> .apply(Values.<String> create())
>>
>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>> .apply(Window.<Customer3> into(new
>> GlobalWindows()).triggering(Repeatedly.forever(
>> trigger1 ))
>> .accumulatingFiredPanes())
>> ;
>> /**PARSE POLICY*/
>> PCollection<Policy2> policyInput = pipeline
>> .apply((KafkaIO.<String, String>
>> read().withTopics(ImmutableList.of(policyTopic))
>>
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(StringDeserializer.class))
>> // .withWatermarkFn(new AddWatermarkFn())
>> //.withTimestampFn2(new AddCustomerTimestampFn())
>> .withoutMetadata())
>> .apply(Values.<String> create())
>> //.apply(ParseJsons.of(Customer.class));
>> .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>> .apply(Window.<Policy2> into(new
>> GlobalWindows()).triggering(Repeatedly.forever(
>> trigger1))
>> .accumulatingFiredPanes())
>> ;
>> /**PARSE CLAIM**/
>>
>> PCollection<Claim2> claimInput = pipeline
>> .apply((KafkaIO.<String, String>
>> read().withTopics(ImmutableList.of(claimTopic))
>>
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(StringDeserializer.class))
>> .withoutMetadata())
>> .apply(Values.<String> create())
>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>
>> .apply(Window.<Claim2> into(new
>> GlobalWindows()).triggering(trigger1)
>> .accumulatingFiredPanes())
>> ;
>>
>> /**CUSTOMER ********/
>> PCollection<KV<Integer,String>> all_customers = customerInput
>> .apply(new ExtractAndMapCustomerKey())
>> ;
>> /***POLICY********/
>> PCollection<KV<Integer,String>> all_policies = policyInput
>> .apply(new ExtractAndMapPolicyKey())
>> ;
>> /***CLAIM*******/
>> PCollection<KV<Integer,String>> all_claims = claimInput
>> .apply(new ExtractAndMapClaimKey())
>> ;
>> /**JOIN**************/
>> /**This join works if I comment out the subsequent join**/
>>
>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
>> Join.innerJoin(all_customers,all_policies);
>>
>> /**this join will cause IllegalStateException **/
>>
>> PCollection<KV<Integer,KV<KV<String,String>,String>>>
>> joinedCustomersPoliciesAndClaims =
>> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>
>>
>> The second scenario is using fixed windows. The logic is the same as
>> above. Even in this case triggering is equal for all three collections like
>> this
>>
>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>
>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>> .withAllowedLateness(Duration.standardSeconds(1)));
>>
>> /**JOIN**************/
>> /**NO ERROR this time **/
>>
>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
>> Join.innerJoin(all_customers,all_policies);
>> PCollection<KV<Integer,KV<KV<String,String>,String>>>
>> joinedCustomersPoliciesAndClaims =
>> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>
>> this time I get no errors but I am not able to print the results in the
>> console.
>>
>> So, I make another experiment and again define equal trigger for all
>> three collections. I can print the output to the console for the first two
>> PCollections but the second join again fails with illegalStateException.
>> Triggering definition for all three collections:
>>
>> /**PARSE CLAIM**/
>>
>> PCollection<Claim2> claimInput = pipeline
>>
>> .apply((KafkaIO.<String, String>
>> read().withTopics(ImmutableList.of(claimTopic))
>>
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(StringDeserializer.class))
>> .withoutMetadata())
>> .apply(Values.<String> create())
>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>
>> .apply(Window.<Claim2>
>> into(FixedWindows.of(Duration.standardSeconds(60)))
>> .triggering(AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>> //early update frequency
>> .alignedTo(Duration.standardSeconds(10)))
>>
>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>> .withAllowedLateness(Duration.standardMinutes(5))
>> .accumulatingFiredPanes());
>>
>>
>>
>> In this cases I've added early and late firings which emits results from
>> the pane but again throws exception on the second join.
>>
>> I know it's a lot of information to take in but basically if you have an
>> example where you join three PCollections in global and in fixed windows
>> with appropriate triggering, I'd be eternally grateful:)
>> Or if you could explain how to do it, of course. thanks in advance.
>>
>> Best Regards
>> Artur
>>
>>
>>
>>
>>
>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]> wrote:
>>
>>> Hi Artur,
>>>
>>> When you join the PCollections, they will be flattened and go through a
>>> GroupByKey together. Since the trigger governs when the GroupByKey can emit
>>> output, the triggers have to be equal or the GroupByKey doesn't have a
>>> clear guide as to when it should output. If you can make the triggering on
>>> all the input collections equal, that will resolve this issue. If you still
>>> need different triggering elsewhere, that is fine. You just need to make
>>> them the same going in to the join.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]>
>>> wrote:
>>>
>>>> Hello,
>>>> You can try put PCollection after flatten into same global window with
>>>> triggers as it was before flattening.
>>>>
>>>> Best regards
>>>> Aleksandr Gortujev
>>>>
>>>>
>>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
>>>> [email protected]>:
>>>>
>>>> Hi,
>>>> I am on second week of our PoC with Beam and I am really amazed by the
>>>> capabilities of the framework and how well engineered it is.
>>>>
>>>> Amazed does not mean experienced so please bear with me.
>>>>
>>>> What we try to achieve is to join several streams using windowing and
>>>> triggers. And that is where I fear we hit the limitations for what can be
>>>> done.
>>>>
>>>> In case A we run in global windows and we are able to combine two
>>>> unbounded PCollections but when I try to combine the results with third
>>>> collection I get the exception below. I tried many diffrent trigger
>>>> combinations, but can't make it work.
>>>>
>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>>>> Flatten had incompatible triggers: Repeatedly.forever(
>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>> seconds))
>>>>
>>>> In case B I use fixed windows. Again, I can successfully join two
>>>> collections and print output in the console. When I add the third it runs
>>>> without errors, but I am not able to materialize results in the console.
>>>> Although I am able to print results of merge using Flatten so the error
>>>> above is not longer an issue.
>>>>
>>>> Has anyone experience with joining three or more unbounded
>>>> PCollections? What would be successful windowing, triggering strategy for
>>>> global or fixed window respectively?
>>>>
>>>> Below code snippets from fixed windows case. Windows are defined in the
>>>> same manner for all three collections, customer, claim and policy. The
>>>> Join class I use comes from https://github.com/
>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>
>>>>
>>>> Would be really greateful if any of you would like to share your
>>>> knowledge.
>>>>
>>>> Best Regard
>>>> Artur
>>>>
>>>> PCollection<Claim2> claimInput = pipeline
>>>>
>>>> .apply((KafkaIO.<String, String>
>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(StringDeserializer.class))
>>>> .withoutMetadata())
>>>> .apply(Values.<String> create())
>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>> .apply(Window.<Claim2>
>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>
>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>> .withAllowedLateness(Duration.standardSeconds(1)));
>>>>
>>>> /**JOIN**************/
>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
>>>> Join.innerJoin(all_customers,all_policies);
>>>> PCollectionList<KV<Integer,String>> collections =
>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>>
>>>> joinedCustomersPoliciesAndClaims =
>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>> //PCollectionList<KV<Integer,String>> collections =
>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>
>>>> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
>>>> Integer,String>>pCollections());
>>>>
>>>>
>>>>
>>>
>>