Re: IllegalStateException when combining 3 streams?
Yes, indeed. It works just fine with equally define triggers in global window. Sorry about that and thank you so much. I can now leav the valley of despair and continue on my learning path;) Best Regards /Artur On Sat, Nov 4, 2017 at 10:40 PM, Aleksandrwrote: > Hello, > You are using for policyInput Repeatedly.forever, for customerInput > Repeatedly.forever and for claimInput your trigger is without > Repeatedly.forever. > > > Best regards > Aleksandr Gortujev > > > сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski : > >> And the error message: >> >> Exception in thread "main" java.lang.IllegalStateException: Inputs to >> Flatten had incompatible triggers: Repeatedly.forever( >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)), >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds) >> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten. >> java:124) >> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten. >> java:102) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) >> at org.apache.beam.sdk.values.PCollectionList.apply( >> PCollectionList.java:182) >> at org.apache.beam.sdk.transforms.join.CoGroupByKey. >> expand(CoGroupByKey.java:124) >> at org.apache.beam.sdk.transforms.join.CoGroupByKey. >> expand(CoGroupByKey.java:74) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) >> at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply( >> KeyedPCollectionTuple.java:106) >> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170) >> at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main( >> CustomerStreamPipelineGlobal.java:220) >> >> >> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski wrote: >> >>> Sure, here is the url to my repo https://github.com/afuyo/ >>> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/ >>> CustomerStreamPipelineGlobal.java >>> >>> It is fairly simple and I do no grouping before. Just read from Kafka >>> and then join. I followed your advice and the 3 way join works for pipeline >>> with fixed windows. Both classes use the same join class. The logic is the >>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me >>> think it could be due to triggers. But I am ofcourse not sure:) >>> >>> /Artur >>> >>> Pipeline pipeline = Pipeline.create(options); >>> >>> Trigger trigger1 = >>> AfterProcessingTime >>> .pastFirstElementInPane() >>> .plusDelayOf(Duration.standardSeconds(10)) >>> ; >>> /**PARSE CUSTOMER*/ >>> PCollection customerInput = pipeline >>> >>> .apply((KafkaIO. >>> read().withTopics(ImmutableList.of(customerTopic)) >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> .withoutMetadata()) >>> .apply(Values. create()) >>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >>> .apply(Window. into(new >>> GlobalWindows()).triggering(Repeatedly.forever( >>> trigger1 )) >>> .accumulatingFiredPanes()) >>> ; >>> /**PARSE POLICY*/ >>> PCollection policyInput = pipeline >>> .apply((KafkaIO. >>> read().withTopics(ImmutableList.of(policyTopic)) >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> // .withWatermarkFn(new AddWatermarkFn()) >>> //.withTimestampFn2(new AddCustomerTimestampFn()) >>> .withoutMetadata()) >>> .apply(Values. create()) >>> //.apply(ParseJsons.of(Customer.class)); >>> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >>> .apply(Window. into(new >>> GlobalWindows()).triggering(Repeatedly.forever( >>> trigger1)) >>> .accumulatingFiredPanes()) >>> ; >>> /**PARSE CLAIM**/ >>> PCollection claimInput = pipeline >>> .apply((KafkaIO. >>> read().withTopics(ImmutableList.of(claimTopic)) >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> .withoutMetadata()) >>> .apply(Values. create()) >>> .apply("ParseJsonEventFn2",
Re: IllegalStateException when combining 3 streams?
And the error message: Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds) at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124) at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) at org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106) at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170) at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220) On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowskiwrote: > Sure, here is the url to my repo https://github.com/afuyo/ > beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/ > CustomerStreamPipelineGlobal.java > > It is fairly simple and I do no grouping before. Just read from Kafka and > then join. I followed your advice and the 3 way join works for pipeline > with fixed windows. Both classes use the same join class. The logic is the > same and it always work for A+B joinc(CoGroupByKey) That's what makes me > think it could be due to triggers. But I am ofcourse not sure:) > > /Artur > > Pipeline pipeline = Pipeline.create(options); > > Trigger trigger1 = > AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(10)) > ; > /**PARSE CUSTOMER*/ > PCollection customerInput = pipeline > > .apply((KafkaIO. > read().withTopics(ImmutableList.of(customerTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values. create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) > .apply(Window. into(new > GlobalWindows()).triggering(Repeatedly.forever( > trigger1 )) > .accumulatingFiredPanes()) > ; > /**PARSE POLICY*/ > PCollection policyInput = pipeline > .apply((KafkaIO. > read().withTopics(ImmutableList.of(policyTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > // .withWatermarkFn(new AddWatermarkFn()) > //.withTimestampFn2(new AddCustomerTimestampFn()) > .withoutMetadata()) > .apply(Values. create()) > //.apply(ParseJsons.of(Customer.class)); > .apply("ParsePolicy", ParDo.of(new ParsePolicy())) > .apply(Window. into(new > GlobalWindows()).triggering(Repeatedly.forever( > trigger1)) > .accumulatingFiredPanes()) > ; > /**PARSE CLAIM**/ > PCollection claimInput = pipeline > .apply((KafkaIO. > read().withTopics(ImmutableList.of(claimTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values. create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) > .apply(Window. into(new > GlobalWindows()).triggering(trigger1) > .accumulatingFiredPanes()) > ; > > /**CUSTOMER / > PCollection > all_customers = customerInput > .apply(new ExtractAndMapCustomerKey()) > ; > /***POLICY/ > PCollection > all_policies = policyInput > .apply(new ExtractAndMapPolicyKey()) > ; > /***CLAIM***/ > PCollection > all_claims = claimInput > .apply(new ExtractAndMapClaimKey()) > ; > /**JOIN**/ > /**This join works if I comment out the subsequent join**/ > // PCollection >> joinedCustomersAndPolicies= >
Re: IllegalStateException when combining 3 streams?
Sure, here is the url to my repo https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java It is fairly simple and I do no grouping before. Just read from Kafka and then join. I followed your advice and the 3 way join works for pipeline with fixed windows. Both classes use the same join class. The logic is the same and it always work for A+B joinc(CoGroupByKey) That's what makes me think it could be due to triggers. But I am ofcourse not sure:) /Artur Pipeline pipeline = Pipeline.create(options); Trigger trigger1 = AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)) ; /**PARSE CUSTOMER*/ PCollection customerInput = pipeline .apply((KafkaIO.read().withTopics(ImmutableList.of(customerTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .withoutMetadata()) .apply(Values. create()) .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) .apply(Window. into(new GlobalWindows()).triggering(Repeatedly.forever( trigger1 )) .accumulatingFiredPanes()) ; /**PARSE POLICY*/ PCollection policyInput = pipeline .apply((KafkaIO. read().withTopics(ImmutableList.of(policyTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) // .withWatermarkFn(new AddWatermarkFn()) //.withTimestampFn2(new AddCustomerTimestampFn()) .withoutMetadata()) .apply(Values. create()) //.apply(ParseJsons.of(Customer.class)); .apply("ParsePolicy", ParDo.of(new ParsePolicy())) .apply(Window. into(new GlobalWindows()).triggering(Repeatedly.forever( trigger1)) .accumulatingFiredPanes()) ; /**PARSE CLAIM**/ PCollection claimInput = pipeline .apply((KafkaIO. read().withTopics(ImmutableList.of(claimTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .withoutMetadata()) .apply(Values. create()) .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) .apply(Window. into(new GlobalWindows()).triggering(trigger1) .accumulatingFiredPanes()) ; /**CUSTOMER / PCollection > all_customers = customerInput .apply(new ExtractAndMapCustomerKey()) ; /***POLICY/ PCollection > all_policies = policyInput .apply(new ExtractAndMapPolicyKey()) ; /***CLAIM***/ PCollection > all_claims = claimInput .apply(new ExtractAndMapClaimKey()) ; /**JOIN**/ /**This join works if I comment out the subsequent join**/ // PCollection >> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies); /**This causes an exception**/ PCollection >> joinedCustomersAndPolicies= Join.innerJoin3Way(all_customers,all_policies,all_claims); /**this join will cause IllegalStateException **/ // PCollection ,String>>> joinedCustomersPoliciesAndClaims = // Join.innerJoin(joinedCustomersAndPolicies,all_claims); /**This will also cause an exception when used with 3 collections**/ //PCollectionList > collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims); //PCollection > merged= collections.apply(Flatten. >pCollections()); And the 3 way join public static PCollection >> innerJoin3Way( final PCollection > leftCollection, final PCollection > rightCollection ,final PCollection > thirdCollection) { final TupleTag v1Tuple = new TupleTag<>(); final TupleTag v2Tuple = new TupleTag<>(); final TupleTag v3Tuple = new TupleTag<>(); PCollection > coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) .and(v3Tuple,thirdCollection) .apply(CoGroupByKey.create()); System.out.println(coGbkResultCollection); return coGbkResultCollection.apply(ParDo.of(
Re: IllegalStateException when combining 3 streams?
Can you share the program using global windows and make sure the exception is the same? Basically, this kind of problem has to do with whether you have applied a grouping operation already. The triggering (and sometimes windowing) before a grouping operation is different than after. So, if you are having problems applying a groupbykey somewhere and think the windowing and triggering should be the same, you need to look before that to see if one of the collections has been grouped but the others haven't. On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowskiwrote: > Hej Ben, > thank you for your answer. I think I forgot to mention that the join class > already implements CoGroupByKey and comes from sdk extensions. I haven now > modified it slightly to do 3 way join(see below). It works for 3 > PCollections with fixed windows and provides the output this time, even > when using early and late triggers.That is great! > > But when I try to use it with global windows for all three collections I > get the same exception. > > Is it not possible to make a 3 way join in global window? The reason why I > want to use global window is that I want to have all the historic records > available in these collections. Not sure how that could be achieved using > fixed windows. > > /Artur > > public static PCollection ,V3>>> > innerJoin3Way( > final PCollection > leftCollection, > final PCollection > rightCollection > ,final PCollection > thirdCollection) > { > final TupleTag v1Tuple = new TupleTag<>(); > final TupleTag v2Tuple = new TupleTag<>(); > final TupleTag v3Tuple = new TupleTag<>(); > > PCollection > coGbkResultCollection = > KeyedPCollectionTuple.of(v1Tuple, leftCollection) > .and(v2Tuple, rightCollection) > .and(v3Tuple,thirdCollection) > .apply(CoGroupByKey.create()); > > > return coGbkResultCollection.apply(ParDo.of( > new DoFn ,KV ,V3>> >() { > > @ProcessElement > public void processElement(ProcessContext c) > > > On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers wrote: > >> It looks like this is a problematic interaction between the 2-layer join >> and the (unfortunately complicated) continuation triggering. Specifically, >> the triggering of Join(A, B) has the continuation trigger, so it isn't >> possible to join that with C. >> >> Instead of trying to do Join(Join(A, B), C), consider using a >> CoGroupByKey. This will allow you to join all three input collections at >> the same time, which should have two benefits. First, it will work since >> you won't be trying to merge a continuation trigger with the original >> trigger. Second, it should be more efficient, because you are performing a >> single, three-way join instead of two, two-way joins. >> >> >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html >> for >> more information. >> >> -- Ben >> >> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski wrote: >> >>> Hi Kenneth and Aleksandr and thank you for your prompt answer. >>> >>> So there are two scenarios that I've been trying out but operation is >>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC. >>> In scenario 1 I define three PCollections with global widnows and >>> exactly the same triggers. Now I am able to join A+B but as soon I try to >>> join AB+C i get the exception. >>> . Here is the code snippet where the window and triggers are the same >>> for all three PCollections using global windows: >>> >>> Pipeline pipeline = Pipeline.create(options); >>> >>> Trigger trigger1 = >>> AfterProcessingTime >>> .pastFirstElementInPane() >>> .plusDelayOf(Duration.standardSeconds(10)) >>> ; >>> /**PARSE CUSTOMER*/ >>> PCollection customerInput = pipeline >>> >>> .apply((KafkaIO. >>> read().withTopics(ImmutableList.of(customerTopic)) >>> >>> >>> >>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> .withoutMetadata()) >>> .apply(Values. create()) >>> >>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >>> .apply(Window. into(new >>> GlobalWindows()).triggering(Repeatedly.forever( >>> trigger1 )) >>> .accumulatingFiredPanes()) >>> ; >>> /**PARSE POLICY*/ >>> PCollection policyInput = pipeline >>> .apply((KafkaIO. >>> read().withTopics(ImmutableList.of(policyTopic)) >>> >>>
Re: IllegalStateException when combining 3 streams?
Hej Ben, thank you for your answer. I think I forgot to mention that the join class already implements CoGroupByKey and comes from sdk extensions. I haven now modified it slightly to do 3 way join(see below). It works for 3 PCollections with fixed windows and provides the output this time, even when using early and late triggers.That is great! But when I try to use it with global windows for all three collections I get the same exception. Is it not possible to make a 3 way join in global window? The reason why I want to use global window is that I want to have all the historic records available in these collections. Not sure how that could be achieved using fixed windows. /Artur public staticPCollection ,V3>>> innerJoin3Way( final PCollection > leftCollection, final PCollection > rightCollection ,final PCollection > thirdCollection) { final TupleTag v1Tuple = new TupleTag<>(); final TupleTag v2Tuple = new TupleTag<>(); final TupleTag v3Tuple = new TupleTag<>(); PCollection > coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) .and(v3Tuple,thirdCollection) .apply(CoGroupByKey.create()); return coGbkResultCollection.apply(ParDo.of( new DoFn ,KV ,V3>> >() { @ProcessElement public void processElement(ProcessContext c) On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers wrote: > It looks like this is a problematic interaction between the 2-layer join > and the (unfortunately complicated) continuation triggering. Specifically, > the triggering of Join(A, B) has the continuation trigger, so it isn't > possible to join that with C. > > Instead of trying to do Join(Join(A, B), C), consider using a > CoGroupByKey. This will allow you to join all three input collections at > the same time, which should have two benefits. First, it will work since > you won't be trying to merge a continuation trigger with the original > trigger. Second, it should be more efficient, because you are performing a > single, three-way join instead of two, two-way joins. > > https://beam.apache.org/documentation/sdks/javadoc/2. > 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more > information. > > -- Ben > > On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski wrote: > >> Hi Kenneth and Aleksandr and thank you for your prompt answer. >> >> So there are two scenarios that I've been trying out but operation is >> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC. >> In scenario 1 I define three PCollections with global widnows and exactly >> the same triggers. Now I am able to join A+B but as soon I try to join AB+C >> i get the exception. >> . Here is the code snippet where the window and triggers are the same for >> all three PCollections using global windows: >> >> Pipeline pipeline = Pipeline.create(options); >> >> Trigger trigger1 = >> AfterProcessingTime >> .pastFirstElementInPane() >> .plusDelayOf(Duration.standardSeconds(10)) >> ; >> /**PARSE CUSTOMER*/ >> PCollection customerInput = pipeline >> >> .apply((KafkaIO. >> read().withTopics(ImmutableList.of(customerTopic)) >> >> >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .withoutMetadata()) >> .apply(Values. create()) >> >> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >> .apply(Window. into(new >> GlobalWindows()).triggering(Repeatedly.forever( >> trigger1 )) >> .accumulatingFiredPanes()) >> ; >> /**PARSE POLICY*/ >> PCollection policyInput = pipeline >> .apply((KafkaIO. >> read().withTopics(ImmutableList.of(policyTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> // .withWatermarkFn(new AddWatermarkFn()) >> //.withTimestampFn2(new AddCustomerTimestampFn()) >> .withoutMetadata()) >> .apply(Values. create()) >> //.apply(ParseJsons.of(Customer.class)); >> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >> .apply(Window. into(new >> GlobalWindows()).triggering(Repeatedly.forever( >> trigger1)) >> .accumulatingFiredPanes()) >> ; >> /**PARSE CLAIM**/ >> >> PCollection claimInput = pipeline
Re: IllegalStateException when combining 3 streams?
It looks like this is a problematic interaction between the 2-layer join and the (unfortunately complicated) continuation triggering. Specifically, the triggering of Join(A, B) has the continuation trigger, so it isn't possible to join that with C. Instead of trying to do Join(Join(A, B), C), consider using a CoGroupByKey. This will allow you to join all three input collections at the same time, which should have two benefits. First, it will work since you won't be trying to merge a continuation trigger with the original trigger. Second, it should be more efficient, because you are performing a single, three-way join instead of two, two-way joins. https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more information. -- Ben On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowskiwrote: > Hi Kenneth and Aleksandr and thank you for your prompt answer. > > So there are two scenarios that I've been trying out but operation is > always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC. > In scenario 1 I define three PCollections with global widnows and exactly > the same triggers. Now I am able to join A+B but as soon I try to join AB+C > i get the exception. > . Here is the code snippet where the window and triggers are the same for > all three PCollections using global windows: > > Pipeline pipeline = Pipeline.create(options); > > Trigger trigger1 = > AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(10)) > ; > /**PARSE CUSTOMER*/ > PCollection customerInput = pipeline > > .apply((KafkaIO. > read().withTopics(ImmutableList.of(customerTopic)) > > > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values. create()) > > .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) > .apply(Window. into(new > GlobalWindows()).triggering(Repeatedly.forever( > trigger1 )) > .accumulatingFiredPanes()) > ; > /**PARSE POLICY*/ > PCollection policyInput = pipeline > .apply((KafkaIO. > read().withTopics(ImmutableList.of(policyTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > // .withWatermarkFn(new AddWatermarkFn()) > //.withTimestampFn2(new AddCustomerTimestampFn()) > .withoutMetadata()) > .apply(Values. create()) > //.apply(ParseJsons.of(Customer.class)); > .apply("ParsePolicy", ParDo.of(new ParsePolicy())) > .apply(Window. into(new > GlobalWindows()).triggering(Repeatedly.forever( > trigger1)) > .accumulatingFiredPanes()) > ; > /**PARSE CLAIM**/ > > PCollection claimInput = pipeline > .apply((KafkaIO. > read().withTopics(ImmutableList.of(claimTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values. create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) > > .apply(Window. into(new GlobalWindows()).triggering(trigger1) > .accumulatingFiredPanes()) > ; > > /**CUSTOMER / > PCollection > all_customers = customerInput > .apply(new ExtractAndMapCustomerKey()) > ; > /***POLICY/ > PCollection > all_policies = policyInput > .apply(new ExtractAndMapPolicyKey()) > ; > /***CLAIM***/ > PCollection > all_claims = claimInput > .apply(new ExtractAndMapClaimKey()) > ; > /**JOIN**/ > /**This join works if I comment out the subsequent join**/ > > PCollection >> joinedCustomersAndPolicies= > Join.innerJoin(all_customers,all_policies); > > /**this join will cause IllegalStateException **/ > > PCollection ,String>>> > joinedCustomersPoliciesAndClaims = >Join.innerJoin(joinedCustomersAndPolicies,all_claims); > > > > The second scenario is using fixed windows. The logic is the same as > above. Even in this case triggering is equal for all three collections like > this > > .apply(Window. into(FixedWindows.of(Duration.standardSeconds(100))) >
Re: IllegalStateException when combining 3 streams?
Hi Kenneth and Aleksandr and thank you for your prompt answer. So there are two scenarios that I've been trying out but operation is always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC. In scenario 1 I define three PCollections with global widnows and exactly the same triggers. Now I am able to join A+B but as soon I try to join AB+C i get the exception. . Here is the code snippet where the window and triggers are the same for all three PCollections using global windows: Pipeline pipeline = Pipeline.create(options); Trigger trigger1 = AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)) ; /**PARSE CUSTOMER*/ PCollection customerInput = pipeline .apply((KafkaIO.read().withTopics(ImmutableList.of(customerTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .withoutMetadata()) .apply(Values. create()) .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) .apply(Window. into(new GlobalWindows()).triggering(Repeatedly.forever( trigger1 )) .accumulatingFiredPanes()) ; /**PARSE POLICY*/ PCollection policyInput = pipeline .apply((KafkaIO. read().withTopics(ImmutableList.of(policyTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) // .withWatermarkFn(new AddWatermarkFn()) //.withTimestampFn2(new AddCustomerTimestampFn()) .withoutMetadata()) .apply(Values. create()) //.apply(ParseJsons.of(Customer.class)); .apply("ParsePolicy", ParDo.of(new ParsePolicy())) .apply(Window. into(new GlobalWindows()).triggering(Repeatedly.forever( trigger1)) .accumulatingFiredPanes()) ; /**PARSE CLAIM**/ PCollection claimInput = pipeline .apply((KafkaIO. read().withTopics(ImmutableList.of(claimTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .withoutMetadata()) .apply(Values. create()) .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) .apply(Window. into(new GlobalWindows()).triggering(trigger1) .accumulatingFiredPanes()) ; /**CUSTOMER / PCollection > all_customers = customerInput .apply(new ExtractAndMapCustomerKey()) ; /***POLICY/ PCollection > all_policies = policyInput .apply(new ExtractAndMapPolicyKey()) ; /***CLAIM***/ PCollection > all_claims = claimInput .apply(new ExtractAndMapClaimKey()) ; /**JOIN**/ /**This join works if I comment out the subsequent join**/ PCollection >> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies); /**this join will cause IllegalStateException **/ PCollection ,String>>> joinedCustomersPoliciesAndClaims = Join.innerJoin(joinedCustomersAndPolicies,all_claims); The second scenario is using fixed windows. The logic is the same as above. Even in this case triggering is equal for all three collections like this .apply(Window. into(FixedWindows.of(Duration.standardSeconds(100))) .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() .withAllowedLateness(Duration.standardSeconds(1))); /**JOIN**/ /**NO ERROR this time **/ PCollection >> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies); PCollection ,String>>> joinedCustomersPoliciesAndClaims = Join.innerJoin(joinedCustomersAndPolicies,all_claims); this time I get no errors but I am not able to print the results in the console. So, I make another experiment and again define equal trigger for all three collections. I can print the output to the console for the first two PCollections but the second join again fails with illegalStateException. Triggering definition for all three collections: /**PARSE CLAIM**/ PCollection claimInput = pipeline .apply((KafkaIO. read().withTopics(ImmutableList.of(claimTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class))
Re: IllegalStateException when combining 3 streams?
Hi Artur, When you join the PCollections, they will be flattened and go through a GroupByKey together. Since the trigger governs when the GroupByKey can emit output, the triggers have to be equal or the GroupByKey doesn't have a clear guide as to when it should output. If you can make the triggering on all the input collections equal, that will resolve this issue. If you still need different triggering elsewhere, that is fine. You just need to make them the same going in to the join. Kenn On Fri, Nov 3, 2017 at 9:48 AM, Aleksandrwrote: > Hello, > You can try put PCollection after flatten into same global window with > triggers as it was before flattening. > > Best regards > Aleksandr Gortujev > > > 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" < > art...@gmail.com>: > > Hi, > I am on second week of our PoC with Beam and I am really amazed by the > capabilities of the framework and how well engineered it is. > > Amazed does not mean experienced so please bear with me. > > What we try to achieve is to join several streams using windowing and > triggers. And that is where I fear we hit the limitations for what can be > done. > > In case A we run in global windows and we are able to combine two > unbounded PCollections but when I try to combine the results with third > collection I get the exception below. I tried many diffrent trigger > combinations, but can't make it work. > > Exception in thread "main" java.lang.IllegalStateException: Inputs to > Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr > onizedProcessingTime.pastFirstElementInPane()), > AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 > seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 > seconds)) > > In case B I use fixed windows. Again, I can successfully join two > collections and print output in the console. When I add the third it runs > without errors, but I am not able to materialize results in the console. > Although I am able to print results of merge using Flatten so the error > above is not longer an issue. > > Has anyone experience with joining three or more unbounded PCollections? > What would be successful windowing, triggering strategy for global or fixed > window respectively? > > Below code snippets from fixed windows case. Windows are defined in the > same manner for all three collections, customer, claim and policy. The > Join class I use comes from https://github.com/apache > /beam/blob/master/sdks/java/extensions/join-library/src/main > /java/org/apache/beam/sdk/extensions/joinlibrary/Join.java > > > Would be really greateful if any of you would like to share your > knowledge. > > Best Regard > Artur > > PCollection claimInput = pipeline > > .apply((KafkaIO. > read().withTopics(ImmutableList.of(claimTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values. create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) > .apply(Window. > into(FixedWindows.of(Duration.standardSeconds(100))) > > .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() > .withAllowedLateness(Duration.standardSeconds(1))); > > /**JOIN**/ > PCollection >> joinedCustomersAndPolicies= > Join.innerJoin(all_customers,all_policies); > PCollectionList > collections = > PCollectionList.of(all_customers).and(all_policies).and(all_claims); > PCollection ,String>>> > joinedCustomersPoliciesAndClaims = > Join.innerJoin(joinedCustomersAndPolicies,all_claims); > //PCollectionList > collections = > PCollectionList.of(all_customers).and(all_policies); > > PCollection > merged= collections.apply(Flatten. Integer,String>>pCollections()); > > >
Re: IllegalStateException when combining 3 streams?
Hello, You can try put PCollection after flatten into same global window with triggers as it was before flattening. Best regards Aleksandr Gortujev 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski": Hi, I am on second week of our PoC with Beam and I am really amazed by the capabilities of the framework and how well engineered it is. Amazed does not mean experienced so please bear with me. What we try to achieve is to join several streams using windowing and triggers. And that is where I fear we hit the limitations for what can be done. In case A we run in global windows and we are able to combine two unbounded PCollections but when I try to combine the results with third collection I get the exception below. I tried many diffrent trigger combinations, but can't make it work. Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: Repeatedly.forever( AfterSynchronizedProcessingTime.pastFirstElementInPane()), AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)) In case B I use fixed windows. Again, I can successfully join two collections and print output in the console. When I add the third it runs without errors, but I am not able to materialize results in the console. Although I am able to print results of merge using Flatten so the error above is not longer an issue. Has anyone experience with joining three or more unbounded PCollections? What would be successful windowing, triggering strategy for global or fixed window respectively? Below code snippets from fixed windows case. Windows are defined in the same manner for all three collections, customer, claim and policy. The Join class I use comes from https://github.com/apache/beam/blob/master/sdks/java/ extensions/join-library/src/main/java/org/apache/beam/sdk/ extensions/joinlibrary/Join.java Would be really greateful if any of you would like to share your knowledge. Best Regard Artur PCollection claimInput = pipeline .apply((KafkaIO. read().withTopics(ImmutableList.of(claimTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .withoutMetadata()) .apply(Values. create()) .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) .apply(Window. into(FixedWindows.of(Duration.standardSeconds(100))) .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() .withAllowedLateness(Duration.standardSeconds(1))); /**JOIN**/ PCollection >> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies); PCollectionList > collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims); PCollection ,String>>> joinedCustomersPoliciesAndClaims = Join.innerJoin(joinedCustomersAndPolicies,all_claims); //PCollectionList > collections = PCollectionList.of(all_customers).and(all_policies); PCollection > merged= collections.apply(Flatten. >pCollections());