Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
Yes, indeed. It works just fine with equally define triggers in global
window. Sorry about that and thank you so much.
I can now leav the valley of despair and continue on my learning path;)

Best Regards
/Artur

On Sat, Nov 4, 2017 at 10:40 PM, Aleksandr  wrote:

> Hello,
> You are using for policyInput Repeatedly.forever, for customerInput
> Repeatedly.forever and for claimInput your trigger is without 
> Repeatedly.forever.
>
>
> Best regards
> Aleksandr Gortujev
>
>
> сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski :
>
>> And the error message:
>>
>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>> Flatten had incompatible triggers: Repeatedly.forever(
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)),
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:124)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:102)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.values.PCollectionList.apply(
>> PCollectionList.java:182)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:124)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:74)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(
>> KeyedPCollectionTuple.java:106)
>> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
>> at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(
>> CustomerStreamPipelineGlobal.java:220)
>>
>>
>> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski  wrote:
>>
>>> Sure, here is the url to my repo https://github.com/afuyo/
>>> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
>>> CustomerStreamPipelineGlobal.java
>>>
>>> It is fairly simple and I do no grouping before. Just read from Kafka
>>> and then join. I followed your advice and the 3 way join works for pipeline
>>> with fixed windows. Both classes use the same join class. The logic is the
>>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
>>> think it could be due to triggers. But I am ofcourse not sure:)
>>>
>>> /Artur
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>   Trigger trigger1 =
>>>   AfterProcessingTime
>>>   .pastFirstElementInPane()
>>>   .plusDelayOf(Duration.standardSeconds(10))
>>>   ;
>>>   /**PARSE CUSTOMER*/
>>>   PCollection customerInput = pipeline
>>>
>>>   .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(customerTopic))
>>>   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>   .withKeyDeserializer(StringDeserializer.class)
>>>   .withValueDeserializer(StringDeserializer.class))
>>>   .withoutMetadata())
>>>   .apply(Values. create())
>>>   .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>   .apply(Window. into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>   trigger1 ))
>>>   .accumulatingFiredPanes())
>>>   ;
>>>   /**PARSE POLICY*/
>>>   PCollection policyInput = pipeline
>>>   .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(policyTopic))
>>>   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>   .withKeyDeserializer(StringDeserializer.class)
>>>   .withValueDeserializer(StringDeserializer.class))
>>>   // .withWatermarkFn(new AddWatermarkFn())
>>>   //.withTimestampFn2(new AddCustomerTimestampFn())
>>>   .withoutMetadata())
>>>   .apply(Values. create())
>>>   //.apply(ParseJsons.of(Customer.class));
>>>   .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>  .apply(Window. into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>   trigger1))
>>>   .accumulatingFiredPanes())
>>>   ;
>>>  /**PARSE CLAIM**/
>>>   PCollection claimInput = pipeline
>>>   .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(claimTopic))
>>>   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>   .withKeyDeserializer(StringDeserializer.class)
>>>   .withValueDeserializer(StringDeserializer.class))
>>>   .withoutMetadata())
>>>   .apply(Values. create())
>>>   .apply("ParseJsonEventFn2", 

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
And the error message:

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers:
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106)
at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
at
com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)


On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski  wrote:

> Sure, here is the url to my repo https://github.com/afuyo/
> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
> CustomerStreamPipelineGlobal.java
>
> It is fairly simple and I do no grouping before. Just read from Kafka and
> then join. I followed your advice and the 3 way join works for pipeline
> with fixed windows. Both classes use the same join class. The logic is the
> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
> think it could be due to triggers. But I am ofcourse not sure:)
>
> /Artur
>
> Pipeline pipeline = Pipeline.create(options);
>
>   Trigger trigger1 =
>   AfterProcessingTime
>   .pastFirstElementInPane()
>   .plusDelayOf(Duration.standardSeconds(10))
>   ;
>   /**PARSE CUSTOMER*/
>   PCollection customerInput = pipeline
>
>   .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(customerTopic))
>   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>   .withKeyDeserializer(StringDeserializer.class)
>   .withValueDeserializer(StringDeserializer.class))
>   .withoutMetadata())
>   .apply(Values. create())
>   .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>   .apply(Window. into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>   trigger1 ))
>   .accumulatingFiredPanes())
>   ;
>   /**PARSE POLICY*/
>   PCollection policyInput = pipeline
>   .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(policyTopic))
>   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>   .withKeyDeserializer(StringDeserializer.class)
>   .withValueDeserializer(StringDeserializer.class))
>   // .withWatermarkFn(new AddWatermarkFn())
>   //.withTimestampFn2(new AddCustomerTimestampFn())
>   .withoutMetadata())
>   .apply(Values. create())
>   //.apply(ParseJsons.of(Customer.class));
>   .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>  .apply(Window. into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>   trigger1))
>   .accumulatingFiredPanes())
>   ;
>  /**PARSE CLAIM**/
>   PCollection claimInput = pipeline
>   .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(claimTopic))
>   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>   .withKeyDeserializer(StringDeserializer.class)
>   .withValueDeserializer(StringDeserializer.class))
>   .withoutMetadata())
>   .apply(Values. create())
>   .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>   .apply(Window. into(new 
> GlobalWindows()).triggering(trigger1)
>   .accumulatingFiredPanes())
>   ;
>
>   /**CUSTOMER  /
>   PCollection> all_customers = customerInput
>   .apply(new ExtractAndMapCustomerKey())
>   ;
>   /***POLICY/
>   PCollection> all_policies = policyInput
>   .apply(new ExtractAndMapPolicyKey())
>   ;
>   /***CLAIM***/
>   PCollection> all_claims = claimInput
>   .apply(new ExtractAndMapClaimKey())
>   ;
>   /**JOIN**/
>   /**This join works if I comment out the subsequent join**/
> // PCollection>> joinedCustomersAndPolicies= 
> 

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
Sure, here is the url to my repo
https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java


It is fairly simple and I do no grouping before. Just read from Kafka and
then join. I followed your advice and the 3 way join works for pipeline
with fixed windows. Both classes use the same join class. The logic is the
same and it always work for A+B joinc(CoGroupByKey) That's what makes me
think it could be due to triggers. But I am ofcourse not sure:)

/Artur

Pipeline pipeline = Pipeline.create(options);

  Trigger trigger1 =
  AfterProcessingTime
  .pastFirstElementInPane()
  .plusDelayOf(Duration.standardSeconds(10))
  ;
  /**PARSE CUSTOMER*/
  PCollection customerInput = pipeline

  .apply((KafkaIO.
read().withTopics(ImmutableList.of(customerTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class))
  .withoutMetadata())
  .apply(Values. create())
  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
  .apply(Window. into(new
GlobalWindows()).triggering(Repeatedly.forever(
  trigger1 ))
  .accumulatingFiredPanes())
  ;
  /**PARSE POLICY*/
  PCollection policyInput = pipeline
  .apply((KafkaIO.
read().withTopics(ImmutableList.of(policyTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class))
  // .withWatermarkFn(new AddWatermarkFn())
  //.withTimestampFn2(new AddCustomerTimestampFn())
  .withoutMetadata())
  .apply(Values. create())
  //.apply(ParseJsons.of(Customer.class));
  .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
 .apply(Window. into(new
GlobalWindows()).triggering(Repeatedly.forever(
  trigger1))
  .accumulatingFiredPanes())
  ;
 /**PARSE CLAIM**/
  PCollection claimInput = pipeline
  .apply((KafkaIO.
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class))
  .withoutMetadata())
  .apply(Values. create())
  .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
  .apply(Window. into(new GlobalWindows()).triggering(trigger1)
  .accumulatingFiredPanes())
  ;

  /**CUSTOMER  /
  PCollection> all_customers = customerInput
  .apply(new ExtractAndMapCustomerKey())
  ;
  /***POLICY/
  PCollection> all_policies = policyInput
  .apply(new ExtractAndMapPolicyKey())
  ;
  /***CLAIM***/
  PCollection> all_claims = claimInput
  .apply(new ExtractAndMapClaimKey())
  ;
  /**JOIN**/
  /**This join works if I comment out the subsequent join**/
// PCollection>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);

/**This causes an exception**/
 PCollection>>
joinedCustomersAndPolicies=
Join.innerJoin3Way(all_customers,all_policies,all_claims);

  /**this join will cause IllegalStateException **/
 // PCollection,String>>>
joinedCustomersPoliciesAndClaims =
   // Join.innerJoin(joinedCustomersAndPolicies,all_claims);

   /**This will also cause an exception when used with 3 collections**/
  //PCollectionList> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
  //PCollection> merged=
collections.apply(Flatten.>pCollections());

And the 3 way join

public static  PCollection>> innerJoin3Way(
   final PCollection> leftCollection,
   final PCollection> rightCollection
   ,final PCollection> thirdCollection)
{

   final TupleTag v1Tuple = new TupleTag<>();
   final TupleTag v2Tuple = new TupleTag<>();
   final TupleTag v3Tuple = new TupleTag<>();

   PCollection> coGbkResultCollection =
   KeyedPCollectionTuple.of(v1Tuple, leftCollection)
   .and(v2Tuple, rightCollection)
   .and(v3Tuple,thirdCollection)
   .apply(CoGroupByKey.create());

   System.out.println(coGbkResultCollection);

   return coGbkResultCollection.apply(ParDo.of(
  

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Ben Chambers
Can you share the program using global windows and make sure the exception
is the same? Basically, this kind of problem has to do with whether you
have applied a grouping operation already. The triggering (and sometimes
windowing) before a grouping operation is different than after. So, if you
are having problems applying a groupbykey somewhere and think the windowing
and triggering should be the same, you need to look before that to see if
one of the collections has been grouped but the others haven't.

On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski  wrote:

> Hej Ben,
> thank you for your answer. I think I forgot to mention that the join class
> already implements CoGroupByKey and comes from sdk extensions. I haven now
> modified it slightly to do 3 way join(see below). It works for 3
> PCollections with fixed windows and provides the output this time, even
> when using early and late triggers.That is great!
>
> But when I try to use it  with global windows for all three collections I
> get the same exception.
>
> Is it not possible to make a 3 way join in global window? The reason why I
> want to use global window is that I want to have all the historic records
> available in these collections. Not sure how that could be achieved using
> fixed windows.
>
> /Artur
>
> public static  PCollection,V3>>> 
> innerJoin3Way(
> final PCollection> leftCollection,
> final PCollection> rightCollection
> ,final PCollection> thirdCollection)
> {
> final TupleTag v1Tuple = new TupleTag<>();
> final TupleTag v2Tuple = new TupleTag<>();
> final TupleTag v3Tuple = new TupleTag<>();
>
> PCollection> coGbkResultCollection =
> KeyedPCollectionTuple.of(v1Tuple, leftCollection)
> .and(v2Tuple, rightCollection)
> .and(v3Tuple,thirdCollection)
> .apply(CoGroupByKey.create());
>
>
> return coGbkResultCollection.apply(ParDo.of(
> new DoFn,KV,V3>> >() {
>
> @ProcessElement
> public void processElement(ProcessContext c)
>
>
> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers  wrote:
>
>> It looks like this is a problematic interaction between the 2-layer join
>> and the (unfortunately complicated) continuation triggering. Specifically,
>> the triggering of Join(A, B) has the continuation trigger, so it isn't
>> possible to join that with C.
>>
>> Instead of trying to do Join(Join(A, B), C), consider using a
>> CoGroupByKey. This will allow you to join all three input collections at
>> the same time, which should have two benefits. First, it will work since
>> you won't be trying to merge a continuation trigger with the original
>> trigger. Second, it should be more efficient, because you are performing a
>> single, three-way join instead of two, two-way joins.
>>
>>
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
>>  for
>> more information.
>>
>> -- Ben
>>
>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski  wrote:
>>
>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>
>>> So there are two scenarios that I've been trying out but operation is
>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>> In scenario 1 I define three PCollections with global widnows and
>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>> join AB+C i get the exception.
>>> . Here is the code snippet where the window and triggers are the same
>>> for all three PCollections using global windows:
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>  Trigger trigger1 =
>>>  AfterProcessingTime
>>>  .pastFirstElementInPane()
>>>  .plusDelayOf(Duration.standardSeconds(10))
>>>  ;
>>>  /**PARSE CUSTOMER*/
>>>  PCollection customerInput = pipeline
>>>
>>>  .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(customerTopic))
>>>
>>>
>>>  
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>  .withKeyDeserializer(StringDeserializer.class)
>>>  .withValueDeserializer(StringDeserializer.class))
>>>  .withoutMetadata())
>>>  .apply(Values. create())
>>>
>>>  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>  .apply(Window. into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>  trigger1 ))
>>>  .accumulatingFiredPanes())
>>>  ;
>>>  /**PARSE POLICY*/
>>>  PCollection policyInput = pipeline
>>>  .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(policyTopic))
>>>  
>>> 

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
Hej Ben,
thank you for your answer. I think I forgot to mention that the join class
already implements CoGroupByKey and comes from sdk extensions. I haven now
modified it slightly to do 3 way join(see below). It works for 3
PCollections with fixed windows and provides the output this time, even
when using early and late triggers.That is great!

But when I try to use it  with global windows for all three collections I
get the same exception.

Is it not possible to make a 3 way join in global window? The reason why I
want to use global window is that I want to have all the historic records
available in these collections. Not sure how that could be achieved using
fixed windows.

/Artur

public static  PCollection,V3>>> innerJoin3Way(
final PCollection> leftCollection,
final PCollection> rightCollection
,final PCollection> thirdCollection)
{
final TupleTag v1Tuple = new TupleTag<>();
final TupleTag v2Tuple = new TupleTag<>();
final TupleTag v3Tuple = new TupleTag<>();

PCollection> coGbkResultCollection =
KeyedPCollectionTuple.of(v1Tuple, leftCollection)
.and(v2Tuple, rightCollection)
.and(v3Tuple,thirdCollection)
.apply(CoGroupByKey.create());


return coGbkResultCollection.apply(ParDo.of(
new DoFn,KV,V3>> >() {

@ProcessElement
public void processElement(ProcessContext c)


On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers  wrote:

> It looks like this is a problematic interaction between the 2-layer join
> and the (unfortunately complicated) continuation triggering. Specifically,
> the triggering of Join(A, B) has the continuation trigger, so it isn't
> possible to join that with C.
>
> Instead of trying to do Join(Join(A, B), C), consider using a
> CoGroupByKey. This will allow you to join all three input collections at
> the same time, which should have two benefits. First, it will work since
> you won't be trying to merge a continuation trigger with the original
> trigger. Second, it should be more efficient, because you are performing a
> single, three-way join instead of two, two-way joins.
>
> https://beam.apache.org/documentation/sdks/javadoc/2.
> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
> information.
>
> -- Ben
>
> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski  wrote:
>
>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>
>> So there are two scenarios that I've been trying out but operation is
>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>> In scenario 1 I define three PCollections with global widnows and exactly
>> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
>> i get the exception.
>> . Here is the code snippet where the window and triggers are the same for
>> all three PCollections using global windows:
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>>  Trigger trigger1 =
>>  AfterProcessingTime
>>  .pastFirstElementInPane()
>>  .plusDelayOf(Duration.standardSeconds(10))
>>  ;
>>  /**PARSE CUSTOMER*/
>>  PCollection customerInput = pipeline
>>
>>  .apply((KafkaIO. 
>> read().withTopics(ImmutableList.of(customerTopic))
>>
>>
>>  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>  .withKeyDeserializer(StringDeserializer.class)
>>  .withValueDeserializer(StringDeserializer.class))
>>  .withoutMetadata())
>>  .apply(Values. create())
>>
>>  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>  .apply(Window. into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>  trigger1 ))
>>  .accumulatingFiredPanes())
>>  ;
>>  /**PARSE POLICY*/
>>  PCollection policyInput = pipeline
>>  .apply((KafkaIO. 
>> read().withTopics(ImmutableList.of(policyTopic))
>>  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>  .withKeyDeserializer(StringDeserializer.class)
>>  .withValueDeserializer(StringDeserializer.class))
>>  // .withWatermarkFn(new AddWatermarkFn())
>>  //.withTimestampFn2(new AddCustomerTimestampFn())
>>  .withoutMetadata())
>>  .apply(Values. create())
>>  //.apply(ParseJsons.of(Customer.class));
>>  .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>> .apply(Window. into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>  trigger1))
>>  .accumulatingFiredPanes())
>>  ;
>> /**PARSE CLAIM**/
>>
>>  PCollection claimInput = pipeline

Re: IllegalStateException when combining 3 streams?

2017-11-03 Thread Ben Chambers
It looks like this is a problematic interaction between the 2-layer join
and the (unfortunately complicated) continuation triggering. Specifically,
the triggering of Join(A, B) has the continuation trigger, so it isn't
possible to join that with C.

Instead of trying to do Join(Join(A, B), C), consider using a CoGroupByKey.
This will allow you to join all three input collections at the same time,
which should have two benefits. First, it will work since you won't be
trying to merge a continuation trigger with the original trigger. Second,
it should be more efficient, because you are performing a single, three-way
join instead of two, two-way joins.

https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
for
more information.

-- Ben

On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski  wrote:

> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>
> So there are two scenarios that I've been trying out but operation is
> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
> In scenario 1 I define three PCollections with global widnows and exactly
> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
> i get the exception.
> . Here is the code snippet where the window and triggers are the same for
> all three PCollections using global windows:
>
> Pipeline pipeline = Pipeline.create(options);
>
>  Trigger trigger1 =
>  AfterProcessingTime
>  .pastFirstElementInPane()
>  .plusDelayOf(Duration.standardSeconds(10))
>  ;
>  /**PARSE CUSTOMER*/
>  PCollection customerInput = pipeline
>
>  .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(customerTopic))
>
>
>  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>  .withKeyDeserializer(StringDeserializer.class)
>  .withValueDeserializer(StringDeserializer.class))
>  .withoutMetadata())
>  .apply(Values. create())
>
>  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>  .apply(Window. into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>  trigger1 ))
>  .accumulatingFiredPanes())
>  ;
>  /**PARSE POLICY*/
>  PCollection policyInput = pipeline
>  .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(policyTopic))
>  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>  .withKeyDeserializer(StringDeserializer.class)
>  .withValueDeserializer(StringDeserializer.class))
>  // .withWatermarkFn(new AddWatermarkFn())
>  //.withTimestampFn2(new AddCustomerTimestampFn())
>  .withoutMetadata())
>  .apply(Values. create())
>  //.apply(ParseJsons.of(Customer.class));
>  .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
> .apply(Window. into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>  trigger1))
>  .accumulatingFiredPanes())
>  ;
> /**PARSE CLAIM**/
>
>  PCollection claimInput = pipeline
>  .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(claimTopic))
>  
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>  .withKeyDeserializer(StringDeserializer.class)
>  .withValueDeserializer(StringDeserializer.class))
>  .withoutMetadata())
>  .apply(Values. create())
>  .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>
>  .apply(Window. into(new GlobalWindows()).triggering(trigger1)
>  .accumulatingFiredPanes())
>  ;
>
>  /**CUSTOMER  /
>  PCollection> all_customers = customerInput
>  .apply(new ExtractAndMapCustomerKey())
>  ;
>  /***POLICY/
>  PCollection> all_policies = policyInput
>  .apply(new ExtractAndMapPolicyKey())
>  ;
>  /***CLAIM***/
>  PCollection> all_claims = claimInput
>  .apply(new ExtractAndMapClaimKey())
>  ;
>  /**JOIN**/
>  /**This join works if I comment out the subsequent join**/
>
>  PCollection>> joinedCustomersAndPolicies= 
> Join.innerJoin(all_customers,all_policies);
>
>  /**this join will cause IllegalStateException **/
>
>  PCollection,String>>> 
> joinedCustomersPoliciesAndClaims =
>Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>
>
> The second scenario is using fixed windows. The logic is the same as
> above. Even in this case triggering is equal for all three collections like
> this
>
> .apply(Window. into(FixedWindows.of(Duration.standardSeconds(100)))
> 

Re: IllegalStateException when combining 3 streams?

2017-11-03 Thread Artur Mrozowski
Hi Kenneth and Aleksandr and thank you for your prompt answer.

So there are two scenarios that I've been trying out but operation is
always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
In scenario 1 I define three PCollections with global widnows and exactly
the same triggers. Now I am able to join A+B but as soon I try to join AB+C
i get the exception.
. Here is the code snippet where the window and triggers are the same for
all three PCollections using global windows:

Pipeline pipeline = Pipeline.create(options);

 Trigger trigger1 =
 AfterProcessingTime
 .pastFirstElementInPane()
 .plusDelayOf(Duration.standardSeconds(10))
 ;
 /**PARSE CUSTOMER*/
 PCollection customerInput = pipeline

 .apply((KafkaIO.
read().withTopics(ImmutableList.of(customerTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class))
 .withoutMetadata())
 .apply(Values. create())
 .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
 .apply(Window. into(new
GlobalWindows()).triggering(Repeatedly.forever(
 trigger1 ))
 .accumulatingFiredPanes())
 ;
 /**PARSE POLICY*/
 PCollection policyInput = pipeline
 .apply((KafkaIO.
read().withTopics(ImmutableList.of(policyTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class))
 // .withWatermarkFn(new AddWatermarkFn())
 //.withTimestampFn2(new AddCustomerTimestampFn())
 .withoutMetadata())
 .apply(Values. create())
 //.apply(ParseJsons.of(Customer.class));
 .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
.apply(Window. into(new
GlobalWindows()).triggering(Repeatedly.forever(
 trigger1))
 .accumulatingFiredPanes())
 ;
/**PARSE CLAIM**/
 PCollection claimInput = pipeline
 .apply((KafkaIO.
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class))
 .withoutMetadata())
 .apply(Values. create())
 .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
 .apply(Window. into(new GlobalWindows()).triggering(trigger1)
 .accumulatingFiredPanes())
 ;

 /**CUSTOMER  /
 PCollection> all_customers = customerInput
 .apply(new ExtractAndMapCustomerKey())
 ;
 /***POLICY/
 PCollection> all_policies = policyInput
 .apply(new ExtractAndMapPolicyKey())
 ;
 /***CLAIM***/
 PCollection> all_claims = claimInput
 .apply(new ExtractAndMapClaimKey())
 ;
 /**JOIN**/
 /**This join works if I comment out the subsequent join**/
 PCollection>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
 /**this join will cause IllegalStateException **/
 PCollection,String>>>
joinedCustomersPoliciesAndClaims =
   Join.innerJoin(joinedCustomersAndPolicies,all_claims);



The second scenario is using fixed windows. The logic is the same as above.
Even in this case triggering is equal for all three collections like this

.apply(Window. into(FixedWindows.of(Duration.standardSeconds(100)))
.triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(1)));

/**JOIN**/
/**NO ERROR this time **/
PCollection>> joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
PCollection,String>>>
joinedCustomersPoliciesAndClaims =
  Join.innerJoin(joinedCustomersAndPolicies,all_claims);


this time I get no errors but I am not able to print the results in the
console.

So, I make another experiment and again define equal trigger for all three
collections. I can print the output to the console for the first two
PCollections but the second join again fails with illegalStateException.
Triggering definition for all three collections:

/**PARSE CLAIM**/
 PCollection claimInput = pipeline

 .apply((KafkaIO.
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class))

Re: IllegalStateException when combining 3 streams?

2017-11-03 Thread Kenneth Knowles
Hi Artur,

When you join the PCollections, they will be flattened and go through a
GroupByKey together. Since the trigger governs when the GroupByKey can emit
output, the triggers have to be equal or the GroupByKey doesn't have a
clear guide as to when it should output. If you can make the triggering on
all the input collections equal, that will resolve this issue. If you still
need different triggering elsewhere, that is fine. You just need to make
them the same going in to the join.

Kenn

On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr  wrote:

> Hello,
> You can try put PCollection after flatten into same global window with
> triggers as it was before flattening.
>
> Best regards
> Aleksandr Gortujev
>
>
> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
> art...@gmail.com>:
>
> Hi,
> I am on second week of our PoC with Beam and I am really amazed by the
> capabilities of the framework and how well engineered it is.
>
> Amazed does not mean experienced so please bear with me.
>
> What  we try to achieve is to join several streams using windowing and
> triggers. And that is where I fear we hit the limitations  for what can be
> done.
>
> In case A we run in global windows and we are able to combine two
> unbounded PCollections but when I try to combine the results with third
> collection I get the exception below. I tried many diffrent trigger
> combinations, but can't make it work.
>
> Exception in thread "main" java.lang.IllegalStateException: Inputs to
> Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr
> onizedProcessingTime.pastFirstElementInPane()),
> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds))
>
> In case B I use fixed windows. Again, I can successfully  join two
> collections and print output in the console. When I add the third it runs
> without errors, but I am not able to materialize results in the console.
> Although I am able to print results of merge using Flatten so the error
> above is not longer an issue.
>
> Has anyone experience with joining three or more unbounded PCollections?
> What would be successful windowing, triggering strategy for global or fixed
> window respectively?
>
> Below code snippets from fixed windows case. Windows are defined in the
> same manner for all three collections, customer, claim and policy. The
> Join class I use comes from https://github.com/apache
> /beam/blob/master/sdks/java/extensions/join-library/src/main
> /java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>
>
> Would be really greateful if any of you would like to share your
> knowledge.
>
> Best Regard
> Artur
>
> PCollection claimInput = pipeline
>
> .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(claimTopic))
> 
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class))
> .withoutMetadata())
> .apply(Values. create())
> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
> .apply(Window. 
> into(FixedWindows.of(Duration.standardSeconds(100)))
> 
> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
> .withAllowedLateness(Duration.standardSeconds(1)));
>
>  /**JOIN**/
>  PCollection>> joinedCustomersAndPolicies= 
> Join.innerJoin(all_customers,all_policies);
> PCollectionList> collections = 
> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>  PCollection,String>>> 
> joinedCustomersPoliciesAndClaims =
> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>  //PCollectionList> collections = 
> PCollectionList.of(all_customers).and(all_policies);
>
> PCollection> merged= collections.apply(Flatten. Integer,String>>pCollections());
>
>
>


Re: IllegalStateException when combining 3 streams?

2017-11-03 Thread Aleksandr
Hello,
You can try put PCollection after flatten into same global window with
triggers as it was before flattening.

Best regards
Aleksandr Gortujev


3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" :

Hi,
I am on second week of our PoC with Beam and I am really amazed by the
capabilities of the framework and how well engineered it is.

Amazed does not mean experienced so please bear with me.

What  we try to achieve is to join several streams using windowing and
triggers. And that is where I fear we hit the limitations  for what can be
done.

In case A we run in global windows and we are able to combine two unbounded
PCollections but when I try to combine the results with third collection I
get the exception below. I tried many diffrent trigger combinations, but
can't make it work.

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers: Repeatedly.forever(
AfterSynchronizedProcessingTime.pastFirstElementInPane()),
AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds))

In case B I use fixed windows. Again, I can successfully  join two
collections and print output in the console. When I add the third it runs
without errors, but I am not able to materialize results in the console.
Although I am able to print results of merge using Flatten so the error
above is not longer an issue.

Has anyone experience with joining three or more unbounded PCollections?
What would be successful windowing, triggering strategy for global or fixed
window respectively?

Below code snippets from fixed windows case. Windows are defined in the
same manner for all three collections, customer, claim and policy. The Join
class I use comes from https://github.com/apache/beam/blob/master/sdks/java/
extensions/join-library/src/main/java/org/apache/beam/sdk/
extensions/joinlibrary/Join.java


Would be really greateful if any of you would like to share your knowledge.

Best Regard
Artur

PCollection claimInput = pipeline

.apply((KafkaIO.
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.withoutMetadata())
.apply(Values. create())
.apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
.apply(Window.
into(FixedWindows.of(Duration.standardSeconds(100)))

.triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(1)));

 /**JOIN**/
 PCollection>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
PCollectionList> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
 PCollection,String>>>
joinedCustomersPoliciesAndClaims =
Join.innerJoin(joinedCustomersAndPolicies,all_claims);
 //PCollectionList> collections =
PCollectionList.of(all_customers).and(all_policies);

PCollection> merged= collections.apply(Flatten.>pCollections());