Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
Yes, indeed. It works just fine with equally define triggers in global
window. Sorry about that and thank you so much.
I can now leav the valley of despair and continue on my learning path;)

Best Regards
/Artur

On Sat, Nov 4, 2017 at 10:40 PM, Aleksandr  wrote:

> Hello,
> You are using for policyInput Repeatedly.forever, for customerInput
> Repeatedly.forever and for claimInput your trigger is without 
> Repeatedly.forever.
>
>
> Best regards
> Aleksandr Gortujev
>
>
> сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski :
>
>> And the error message:
>>
>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>> Flatten had incompatible triggers: Repeatedly.forever(
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)),
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:124)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:102)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.values.PCollectionList.apply(
>> PCollectionList.java:182)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:124)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:74)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(
>> KeyedPCollectionTuple.java:106)
>> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
>> at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(
>> CustomerStreamPipelineGlobal.java:220)
>>
>>
>> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski  wrote:
>>
>>> Sure, here is the url to my repo https://github.com/afuyo/
>>> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
>>> CustomerStreamPipelineGlobal.java
>>>
>>> It is fairly simple and I do no grouping before. Just read from Kafka
>>> and then join. I followed your advice and the 3 way join works for pipeline
>>> with fixed windows. Both classes use the same join class. The logic is the
>>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
>>> think it could be due to triggers. But I am ofcourse not sure:)
>>>
>>> /Artur
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>   Trigger trigger1 =
>>>   AfterProcessingTime
>>>   .pastFirstElementInPane()
>>>   .plusDelayOf(Duration.standardSeconds(10))
>>>   ;
>>>   /**PARSE CUSTOMER*/
>>>   PCollection customerInput = pipeline
>>>
>>>   .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(customerTopic))
>>>   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>   .withKeyDeserializer(StringDeserializer.class)
>>>   .withValueDeserializer(StringDeserializer.class))
>>>   .withoutMetadata())
>>>   .apply(Values. create())
>>>   .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>   .apply(Window. into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>   trigger1 ))
>>>   .accumulatingFiredPanes())
>>>   ;
>>>   /**PARSE POLICY*/
>>>   PCollection policyInput = pipeline
>>>   .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(policyTopic))
>>>   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>   .withKeyDeserializer(StringDeserializer.class)
>>>   .withValueDeserializer(StringDeserializer.class))
>>>   // .withWatermarkFn(new AddWatermarkFn())
>>>   //.withTimestampFn2(new AddCustomerTimestampFn())
>>>   .withoutMetadata())
>>>   .apply(Values. create())
>>>   //.apply(ParseJsons.of(Customer.class));
>>>   .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>  .apply(Window. into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>   trigger1))
>>>   .accumulatingFiredPanes())
>>>   ;
>>>  /**PARSE CLAIM**/
>>>   PCollection claimInput = pipeline
>>>   .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(claimTopic))
>>>   
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>   .withKeyDeserializer(StringDeserializer.class)
>>>   .withValueDeserializer(StringDeserializer.class))
>>>   .withoutMetadata())
>>>   .apply(Values. create())
>>>   .apply("ParseJsonEventFn2", 

Re: Beam Slack Channel Invitation Request

2017-11-04 Thread Mingmin Xu
sent, welcome!

On Sat, Nov 4, 2017 at 4:42 PM, Tristan Shephard  wrote:

> Hello,
>
> Can someone please add me to the Beam slack channel?
>
> Thanks in advance,
> Tristan
>



-- 

Mingmin


Re: Request to add to Beam Slack Channel

2017-11-04 Thread Mingmin Xu
sent, welcome to Beam.

On Sat, Nov 4, 2017 at 7:44 PM, Ananth G  wrote:

> Hello,
>
> Could someone please add me to the Beam slack channel?
>
> Regards,
> Ananth
>



-- 

Mingmin


Beam Slack Channel Invitation Request

2017-11-04 Thread Tristan Shephard
Hello,

Can someone please add me to the Beam slack channel?

Thanks in advance,
Tristan


Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
And the error message:

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers:
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106)
at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
at
com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)


On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski  wrote:

> Sure, here is the url to my repo https://github.com/afuyo/
> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
> CustomerStreamPipelineGlobal.java
>
> It is fairly simple and I do no grouping before. Just read from Kafka and
> then join. I followed your advice and the 3 way join works for pipeline
> with fixed windows. Both classes use the same join class. The logic is the
> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
> think it could be due to triggers. But I am ofcourse not sure:)
>
> /Artur
>
> Pipeline pipeline = Pipeline.create(options);
>
>   Trigger trigger1 =
>   AfterProcessingTime
>   .pastFirstElementInPane()
>   .plusDelayOf(Duration.standardSeconds(10))
>   ;
>   /**PARSE CUSTOMER*/
>   PCollection customerInput = pipeline
>
>   .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(customerTopic))
>   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>   .withKeyDeserializer(StringDeserializer.class)
>   .withValueDeserializer(StringDeserializer.class))
>   .withoutMetadata())
>   .apply(Values. create())
>   .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>   .apply(Window. into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>   trigger1 ))
>   .accumulatingFiredPanes())
>   ;
>   /**PARSE POLICY*/
>   PCollection policyInput = pipeline
>   .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(policyTopic))
>   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>   .withKeyDeserializer(StringDeserializer.class)
>   .withValueDeserializer(StringDeserializer.class))
>   // .withWatermarkFn(new AddWatermarkFn())
>   //.withTimestampFn2(new AddCustomerTimestampFn())
>   .withoutMetadata())
>   .apply(Values. create())
>   //.apply(ParseJsons.of(Customer.class));
>   .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>  .apply(Window. into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>   trigger1))
>   .accumulatingFiredPanes())
>   ;
>  /**PARSE CLAIM**/
>   PCollection claimInput = pipeline
>   .apply((KafkaIO. 
> read().withTopics(ImmutableList.of(claimTopic))
>   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>   .withKeyDeserializer(StringDeserializer.class)
>   .withValueDeserializer(StringDeserializer.class))
>   .withoutMetadata())
>   .apply(Values. create())
>   .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>   .apply(Window. into(new 
> GlobalWindows()).triggering(trigger1)
>   .accumulatingFiredPanes())
>   ;
>
>   /**CUSTOMER  /
>   PCollection> all_customers = customerInput
>   .apply(new ExtractAndMapCustomerKey())
>   ;
>   /***POLICY/
>   PCollection> all_policies = policyInput
>   .apply(new ExtractAndMapPolicyKey())
>   ;
>   /***CLAIM***/
>   PCollection> all_claims = claimInput
>   .apply(new ExtractAndMapClaimKey())
>   ;
>   /**JOIN**/
>   /**This join works if I comment out the subsequent join**/
> // PCollection>> joinedCustomersAndPolicies= 
> 

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
Sure, here is the url to my repo
https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java


It is fairly simple and I do no grouping before. Just read from Kafka and
then join. I followed your advice and the 3 way join works for pipeline
with fixed windows. Both classes use the same join class. The logic is the
same and it always work for A+B joinc(CoGroupByKey) That's what makes me
think it could be due to triggers. But I am ofcourse not sure:)

/Artur

Pipeline pipeline = Pipeline.create(options);

  Trigger trigger1 =
  AfterProcessingTime
  .pastFirstElementInPane()
  .plusDelayOf(Duration.standardSeconds(10))
  ;
  /**PARSE CUSTOMER*/
  PCollection customerInput = pipeline

  .apply((KafkaIO.
read().withTopics(ImmutableList.of(customerTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class))
  .withoutMetadata())
  .apply(Values. create())
  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
  .apply(Window. into(new
GlobalWindows()).triggering(Repeatedly.forever(
  trigger1 ))
  .accumulatingFiredPanes())
  ;
  /**PARSE POLICY*/
  PCollection policyInput = pipeline
  .apply((KafkaIO.
read().withTopics(ImmutableList.of(policyTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class))
  // .withWatermarkFn(new AddWatermarkFn())
  //.withTimestampFn2(new AddCustomerTimestampFn())
  .withoutMetadata())
  .apply(Values. create())
  //.apply(ParseJsons.of(Customer.class));
  .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
 .apply(Window. into(new
GlobalWindows()).triggering(Repeatedly.forever(
  trigger1))
  .accumulatingFiredPanes())
  ;
 /**PARSE CLAIM**/
  PCollection claimInput = pipeline
  .apply((KafkaIO.
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class))
  .withoutMetadata())
  .apply(Values. create())
  .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
  .apply(Window. into(new GlobalWindows()).triggering(trigger1)
  .accumulatingFiredPanes())
  ;

  /**CUSTOMER  /
  PCollection> all_customers = customerInput
  .apply(new ExtractAndMapCustomerKey())
  ;
  /***POLICY/
  PCollection> all_policies = policyInput
  .apply(new ExtractAndMapPolicyKey())
  ;
  /***CLAIM***/
  PCollection> all_claims = claimInput
  .apply(new ExtractAndMapClaimKey())
  ;
  /**JOIN**/
  /**This join works if I comment out the subsequent join**/
// PCollection>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);

/**This causes an exception**/
 PCollection>>
joinedCustomersAndPolicies=
Join.innerJoin3Way(all_customers,all_policies,all_claims);

  /**this join will cause IllegalStateException **/
 // PCollection,String>>>
joinedCustomersPoliciesAndClaims =
   // Join.innerJoin(joinedCustomersAndPolicies,all_claims);

   /**This will also cause an exception when used with 3 collections**/
  //PCollectionList> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
  //PCollection> merged=
collections.apply(Flatten.>pCollections());

And the 3 way join

public static  PCollection>> innerJoin3Way(
   final PCollection> leftCollection,
   final PCollection> rightCollection
   ,final PCollection> thirdCollection)
{

   final TupleTag v1Tuple = new TupleTag<>();
   final TupleTag v2Tuple = new TupleTag<>();
   final TupleTag v3Tuple = new TupleTag<>();

   PCollection> coGbkResultCollection =
   KeyedPCollectionTuple.of(v1Tuple, leftCollection)
   .and(v2Tuple, rightCollection)
   .and(v3Tuple,thirdCollection)
   .apply(CoGroupByKey.create());

   System.out.println(coGbkResultCollection);

   return coGbkResultCollection.apply(ParDo.of(
  

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Ben Chambers
Can you share the program using global windows and make sure the exception
is the same? Basically, this kind of problem has to do with whether you
have applied a grouping operation already. The triggering (and sometimes
windowing) before a grouping operation is different than after. So, if you
are having problems applying a groupbykey somewhere and think the windowing
and triggering should be the same, you need to look before that to see if
one of the collections has been grouped but the others haven't.

On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski  wrote:

> Hej Ben,
> thank you for your answer. I think I forgot to mention that the join class
> already implements CoGroupByKey and comes from sdk extensions. I haven now
> modified it slightly to do 3 way join(see below). It works for 3
> PCollections with fixed windows and provides the output this time, even
> when using early and late triggers.That is great!
>
> But when I try to use it  with global windows for all three collections I
> get the same exception.
>
> Is it not possible to make a 3 way join in global window? The reason why I
> want to use global window is that I want to have all the historic records
> available in these collections. Not sure how that could be achieved using
> fixed windows.
>
> /Artur
>
> public static  PCollection,V3>>> 
> innerJoin3Way(
> final PCollection> leftCollection,
> final PCollection> rightCollection
> ,final PCollection> thirdCollection)
> {
> final TupleTag v1Tuple = new TupleTag<>();
> final TupleTag v2Tuple = new TupleTag<>();
> final TupleTag v3Tuple = new TupleTag<>();
>
> PCollection> coGbkResultCollection =
> KeyedPCollectionTuple.of(v1Tuple, leftCollection)
> .and(v2Tuple, rightCollection)
> .and(v3Tuple,thirdCollection)
> .apply(CoGroupByKey.create());
>
>
> return coGbkResultCollection.apply(ParDo.of(
> new DoFn,KV,V3>> >() {
>
> @ProcessElement
> public void processElement(ProcessContext c)
>
>
> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers  wrote:
>
>> It looks like this is a problematic interaction between the 2-layer join
>> and the (unfortunately complicated) continuation triggering. Specifically,
>> the triggering of Join(A, B) has the continuation trigger, so it isn't
>> possible to join that with C.
>>
>> Instead of trying to do Join(Join(A, B), C), consider using a
>> CoGroupByKey. This will allow you to join all three input collections at
>> the same time, which should have two benefits. First, it will work since
>> you won't be trying to merge a continuation trigger with the original
>> trigger. Second, it should be more efficient, because you are performing a
>> single, three-way join instead of two, two-way joins.
>>
>>
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
>>  for
>> more information.
>>
>> -- Ben
>>
>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski  wrote:
>>
>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>
>>> So there are two scenarios that I've been trying out but operation is
>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>> In scenario 1 I define three PCollections with global widnows and
>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>> join AB+C i get the exception.
>>> . Here is the code snippet where the window and triggers are the same
>>> for all three PCollections using global windows:
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>  Trigger trigger1 =
>>>  AfterProcessingTime
>>>  .pastFirstElementInPane()
>>>  .plusDelayOf(Duration.standardSeconds(10))
>>>  ;
>>>  /**PARSE CUSTOMER*/
>>>  PCollection customerInput = pipeline
>>>
>>>  .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(customerTopic))
>>>
>>>
>>>  
>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>  .withKeyDeserializer(StringDeserializer.class)
>>>  .withValueDeserializer(StringDeserializer.class))
>>>  .withoutMetadata())
>>>  .apply(Values. create())
>>>
>>>  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>  .apply(Window. into(new 
>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>  trigger1 ))
>>>  .accumulatingFiredPanes())
>>>  ;
>>>  /**PARSE POLICY*/
>>>  PCollection policyInput = pipeline
>>>  .apply((KafkaIO. 
>>> read().withTopics(ImmutableList.of(policyTopic))
>>>  
>>> 

Re: PipelineTest with TestStreams: unable to serialize

2017-11-04 Thread Matthias Baetens
Hi Aleksandr,

Awesome. That did the trick. Did bump in to this relevant SO answer
, but overlooked those
TupleTags, good spot!

@Eugene: thanks a lot for the tip. Will give it a try soon ;)

Cheers,
Matthias

On Fri, Nov 3, 2017 at 9:00 PM, Eugene Kirpichov 
wrote:

> When debugging serialization exceptions, I always find it very helpful to
> use  -Dsun.io.serialization.extendedDebugInfo=true .
>
> On Fri, Nov 3, 2017 at 9:21 AM Aleksandr  wrote:
>
>> Hello,
>> Probably error is in your tuple tag classes, which are anonymous classes.
>> It means that your test is trying to serialise testpipeline.
>>
>> Best regards
>> Aleksandr Gortujev
>>
>>
>>
>> 3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
>> matthias.baet...@datatonic.com>:
>>
>> Hi all,
>>
>> I'm currently trying to write a TestStream to validate the windowing
>> logic in a Beam pipeline.
>>
>> I'm creating a teststream of Strings and applying the different
>> PTransforms to the stream, ending with a PAssert on some of the events I
>> created
>>
>> TestStream events = TestStream.create(AvroCoder.of(String.class))
>>  .addElements("", "")
>>  .advanceWatermarkToInfinity();
>>
>> PCollection> eventsSessionised = 
>> p.apply(events)
>>
>>  .apply(new Processing(new 
>> TupleTag() {
>>  }, new TupleTag() {
>>  }, new TupleTag() {
>>  }, eventsEnrichedKeyedTag, "", "", 
>> "")).get(eventsEnrichedKeyedTag)
>>  .apply(new 
>> Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, 
>> ALLOWED_LATENESS_MINUTES))
>>  .apply(new Aggregation(uniqueEventsTag, new 
>> TupleTag() {
>>  })).get(uniqueEventsTag).apply(ParDo.of(new 
>> EventToKV()));
>>
>>
>> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, 
>> endWindow1)).containsInAnyOrder(e1,
>>  e2);
>>
>> Running the test function with in a main functions (new
>> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
>> serialize
>>
>> pointing at a custom DoFn which runs fine running the main pipeline.
>>
>>
>> Not sure why this error gets thrown all of a sudden, any pointers / help 
>> would be greatly appreciated.
>>
>> Full stacktrace:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
>> serialize xxx
>>  at 
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>>  at 
>> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>>  at 
>> org.apache.beam.sdk.transforms.ParDo$SingleOutput.(ParDo.java:591)
>>  at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
>>  at xxx.transforms.Processing.expand(Processing.java:52)
>>  at xxx.transforms.Processing.expand(Processing.java:1)
>>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
>>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
>>  at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
>>  at 
>> xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
>>  at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
>> Caused by: java.io.NotSerializableException: 
>> org.apache.beam.sdk.testing.TestPipeline
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>  at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>  at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>  at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>  at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>  at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>  at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>  at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>  at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>  at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>  at 
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>>  ... 10 more
>>

Re: IllegalStateException when combining 3 streams?

2017-11-04 Thread Artur Mrozowski
Hej Ben,
thank you for your answer. I think I forgot to mention that the join class
already implements CoGroupByKey and comes from sdk extensions. I haven now
modified it slightly to do 3 way join(see below). It works for 3
PCollections with fixed windows and provides the output this time, even
when using early and late triggers.That is great!

But when I try to use it  with global windows for all three collections I
get the same exception.

Is it not possible to make a 3 way join in global window? The reason why I
want to use global window is that I want to have all the historic records
available in these collections. Not sure how that could be achieved using
fixed windows.

/Artur

public static  PCollection,V3>>> innerJoin3Way(
final PCollection> leftCollection,
final PCollection> rightCollection
,final PCollection> thirdCollection)
{
final TupleTag v1Tuple = new TupleTag<>();
final TupleTag v2Tuple = new TupleTag<>();
final TupleTag v3Tuple = new TupleTag<>();

PCollection> coGbkResultCollection =
KeyedPCollectionTuple.of(v1Tuple, leftCollection)
.and(v2Tuple, rightCollection)
.and(v3Tuple,thirdCollection)
.apply(CoGroupByKey.create());


return coGbkResultCollection.apply(ParDo.of(
new DoFn,KV,V3>> >() {

@ProcessElement
public void processElement(ProcessContext c)


On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers  wrote:

> It looks like this is a problematic interaction between the 2-layer join
> and the (unfortunately complicated) continuation triggering. Specifically,
> the triggering of Join(A, B) has the continuation trigger, so it isn't
> possible to join that with C.
>
> Instead of trying to do Join(Join(A, B), C), consider using a
> CoGroupByKey. This will allow you to join all three input collections at
> the same time, which should have two benefits. First, it will work since
> you won't be trying to merge a continuation trigger with the original
> trigger. Second, it should be more efficient, because you are performing a
> single, three-way join instead of two, two-way joins.
>
> https://beam.apache.org/documentation/sdks/javadoc/2.
> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
> information.
>
> -- Ben
>
> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski  wrote:
>
>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>
>> So there are two scenarios that I've been trying out but operation is
>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>> In scenario 1 I define three PCollections with global widnows and exactly
>> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
>> i get the exception.
>> . Here is the code snippet where the window and triggers are the same for
>> all three PCollections using global windows:
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>>  Trigger trigger1 =
>>  AfterProcessingTime
>>  .pastFirstElementInPane()
>>  .plusDelayOf(Duration.standardSeconds(10))
>>  ;
>>  /**PARSE CUSTOMER*/
>>  PCollection customerInput = pipeline
>>
>>  .apply((KafkaIO. 
>> read().withTopics(ImmutableList.of(customerTopic))
>>
>>
>>  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>  .withKeyDeserializer(StringDeserializer.class)
>>  .withValueDeserializer(StringDeserializer.class))
>>  .withoutMetadata())
>>  .apply(Values. create())
>>
>>  .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>  .apply(Window. into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>  trigger1 ))
>>  .accumulatingFiredPanes())
>>  ;
>>  /**PARSE POLICY*/
>>  PCollection policyInput = pipeline
>>  .apply((KafkaIO. 
>> read().withTopics(ImmutableList.of(policyTopic))
>>  
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>  .withKeyDeserializer(StringDeserializer.class)
>>  .withValueDeserializer(StringDeserializer.class))
>>  // .withWatermarkFn(new AddWatermarkFn())
>>  //.withTimestampFn2(new AddCustomerTimestampFn())
>>  .withoutMetadata())
>>  .apply(Values. create())
>>  //.apply(ParseJsons.of(Customer.class));
>>  .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>> .apply(Window. into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>  trigger1))
>>  .accumulatingFiredPanes())
>>  ;
>> /**PARSE CLAIM**/
>>
>>  PCollection claimInput = pipeline