Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-24 Thread Jins George
Currently /org.apache.beam.runners.flink.FlinkPipelineOptions/ does not 
have a way to configure externalized checkpoints. Is that something in 
the road map for FlinkRunner?


Thanks,
Jins George

On 03/23/2017 10:27 AM, Aljoscha Krettek wrote:
For this you would use externalised checkpoints: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html 



Unfortunately, the doc is still a bit sparse but it’s basically a 
combination of savepoints and checkpoints. Checkpoints are not cleaned 
up when a job fails and you can restore from them as you would from a 
savepoint.


Best,
Aljoscha

On 22 Mar 2017, at 21:25, Jins George > wrote:



Thanks Aljoscha for the clarification. Savepoints works fine in case 
of controlled stop and restart. In case of a failure( say the entire 
job failed due node crash or application software bug) is there a way 
to resume from the checkpoint on restarting the application ? 
Checkpoint location is configured with HDFS.


Thanks,
Jins George

On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:
As I mentioned before, when running a Flink Job and simply 
cancelling it all state about that job is discarded (with some 
exceptions, such as externalised checkpoints). If you want the state 
of a Job to survive a cancellation you have to perform a savepoint 
[1] and then when restarting the Job you have to specify a savepoint 
from which you want to restore.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html


On 22 Mar 2017, at 01:43, Raghu Angadi > wrote:


Expanding a bit more on what Dan wrote:

  - In Dataflow, there are two modes of restarting a job : regular stop
  and then start & an *update*. The checkpoint is carried over only 
in the

  case of update.
  - Update is the only to keep 'exactly-once' semantics.
  - If the requirements are not very strict, you can enable offset 
commits
  in Kafka itself. KafkaIO lets you configure this. Here the 
pipeline would
  start reading from approximately where it left off in the 
previous run.

 - When a offset commits are enabled, KafkaIO could this by
 implementing 'finalize()' API on KafkaCheckpointMark [1].
 - This is runner independent.
 - The compromise is that this might skip a few records or read 
a few

 old records when the pipeline is restarted.
 - This does not override 'resume from checkpoint' support when 
runner
 provides KafkaCheckpointMark. Externally committed offsets are 
used only

 when KafkaIO's own CheckpointMark is not available.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50

On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin  
wrote:



[We should keep user list involved if that's where the discussion
originally was :)]

Jins George's original question was a good one. The right way to 
resume

from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and 
not the

external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) 
work. If
a user disables checkpointing, then they are explicitly opting 
into "redo

all work" on restart.

--> If checkpointing is enabled but the KafkaCheckpointMark is not 
being
provided, then I'm inclined to agree with Amit that there may 
simply be a

bug in the FlinkRunner. (+aljoscha)

For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it 
restarts with no
checkpoint this will automatically go find the latest offset. That 
would

mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.

Dan

On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu  
wrote:


In SparkRunner, the default checkpoint storage is 
TmpCheckpointDirFactory.
Can it restore during job restart? --Not test the runner in 
streaming for

some time.

Regarding to data-completeness, I would use at-most-once when few 
data
missing(mostly tasknode failure) is tolerated, compared to the 
performance

cost introduced by 'state'/'checkpoint'.

On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela  
wrote:


On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu  
wrote:



Move discuss to dev-list

Savepoint in Flink, also checkpoint in Spark, should be good 
enough to

handle this case.

When people don't enable these features, for example only need

at-most-once
The Spark runner forces checkpointing on any streaming (Beam)

application,
mostly because it uses mapWithState for reading from 
UnboundedSource and

updateStateByKey form GroupByKey - so by design, Spark runner is
at-least-once. Generally, I always thought that applications that


Slack Channel Request

2017-03-24 Thread Prabeesh K.
Hi,

Can someone please add me to the Apache Beam slack channel?

Regards,

Prabeesh K.


Re: guava collections and kryo under spark runner

2017-03-24 Thread Aviem Zur
Oh yes I see your second version now, that indeed reproduces the issue,
thanks!
I'll update the gist to include this change.

On Fri, Mar 24, 2017 at 3:42 PM Antony Mayi  wrote:

> Hi Aviem,
>
> Apologies for the confusion - did you see my second version of the file I
> sent shortly after the first one? That second one had the Row class
> included (using just "implements Serializable").
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 13:36, Aviem Zur  wrote:
>
>
> Hi Antony,
>
> Thanks for sharing your code!
>
> I created a test that uses the exact pipeline. I couldn't find the `Row`
> class referred to in your pipeline so I created it as a POJO and registered
> its coder as `AvroCoder`.
>
> Unfortunately this test passes so it does not reproduce the issue you are
> experiencing.
> Please find the test in the following gist
> https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
>
> Can you try to tweak it to be more like your use case in which you hit the
> exception?
>
> On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi  wrote:
>
> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi  wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur  wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré 
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré 
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at 

Re: guava collections and kryo under spark runner

2017-03-24 Thread Antony Mayi
Hi Aviem,
Apologies for the confusion - did you see my second version of the file I sent 
shortly after the first one? That second one had the Row class included (using 
just "implements Serializable").
Thanks,a. 

On Friday, 24 March 2017, 13:36, Aviem Zur  wrote:
 

 Hi Antony,
Thanks for sharing your code!
I created a test that uses the exact pipeline. I couldn't find the `Row` class 
referred to in your pipeline so I created it as a POJO and registered its coder 
as `AvroCoder`.
Unfortunately this test passes so it does not reproduce the issue you are 
experiencing.Please find the test in the following gist 
https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
Can you try to tweak it to be more like your use case in which you hit the 
exception?
On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi  wrote:

sorry, wrong version of the file. now corrected:a. 

On Friday, 24 March 2017, 13:06, Antony Mayi  wrote:
 

 Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it 
is. The problem appears to be related to Combine.globally(). Attached is my 
demo code showing the error.
Thanks,a. 

On Friday, 24 March 2017, 10:19, Aviem Zur  wrote:
 

 Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's 
concept of coders, so we should be using coders instead of Spark's serializer 
(Specifically, in our configuration, Kryo is used as Spark's serializer).
>From your stack trace it seems that Kryo is being used to serialize your class 
>my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally 
>using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your 
organization) it will help us locate where this issue is happening.

On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré  wrote:

OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré  
> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines 
>> and
>> when running on spark runner it fails due to kryo unable to serialize those. 
>> I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
>> to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: 
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org 
> http://blog.nanthrax.net 
> Talend - http://www.talend.com 
>
>
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



   

   


   

Re: guava collections and kryo under spark runner

2017-03-24 Thread Aviem Zur
Hi Antony,

Thanks for sharing your code!

I created a test that uses the exact pipeline. I couldn't find the `Row`
class referred to in your pipeline so I created it as a POJO and registered
its coder as `AvroCoder`.

Unfortunately this test passes so it does not reproduce the issue you are
experiencing.
Please find the test in the following gist
https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12

Can you try to tweak it to be more like your use case in which you hit the
exception?

On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi  wrote:

> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi  wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur  wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré 
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré 
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org 
> > http://blog.nanthrax.net 
> > Talend - http://www.talend.com 
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>


Re: guava collections and kryo under spark runner

2017-03-24 Thread Antony Mayi
sorry, wrong version of the file. now corrected:a. 

On Friday, 24 March 2017, 13:06, Antony Mayi  wrote:
 

 Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it 
is. The problem appears to be related to Combine.globally(). Attached is my 
demo code showing the error.
Thanks,a. 

On Friday, 24 March 2017, 10:19, Aviem Zur  wrote:
 

 Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's 
concept of coders, so we should be using coders instead of Spark's serializer 
(Specifically, in our configuration, Kryo is used as Spark's serializer).
>From your stack trace it seems that Kryo is being used to serialize your class 
>my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally 
>using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your 
organization) it will help us locate where this issue is happening.

On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré  wrote:

OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré  
> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines 
>> and
>> when running on spark runner it fails due to kryo unable to serialize those. 
>> I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
>> to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: 
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org 
> http://blog.nanthrax.net 
> Talend - http://www.talend.com 
>
>
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



   

   package amayi;

import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class KryoTest {
public static class Row implements Serializable {
public final ImmutableList fields;

public Row(String... fields) {
this.fields = ImmutableList.copyOf(fields);
}
}

public static class CoalesceFn extends Combine.CombineFn {
@Override public List createAccumulator() {
return new ArrayList<>();
}

@Override public List addInput(List accumulator, InputT item) {
accumulator.add(item);
 

Re: HDFS IO - Sink implemented or not?

2017-03-24 Thread Jean-Baptiste Onofré

Hi Jonas,

We did good improvements recently on the HDFS Sink. It's waiting refactoring on 
the IOChannelFactory that should happen pretty soon.


So, basically, you can already use the HDFS Sink but be aware that it gonna 
change pretty soon (and it will be flagged as "deprecated" as it is right now).


Regards
JB

On 03/24/2017 10:17 AM, [TA] Jonas Grabber wrote:

Hello,

in the Git repo I can see HDFSFileSink.java and it looks to me like it's
functional (I'll might test it out later today).

However, in io/HDFS/README.md it says 'Currently, only the read path is
implemented.'.

Can this statement either be removed or extended to give note about
ongoing implementation efforts (that are visible in HDFSFileSink.java)?



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: guava collections and kryo under spark runner

2017-03-24 Thread Aviem Zur
Hi Antony.

Spark uses serializers to serialize data, however this clashes with Beam's
concept of coders, so we should be using coders instead of Spark's
serializer (Specifically, in our configuration, Kryo is used as Spark's
serializer).

>From your stack trace it seems that Kryo is being used to serialize your
class my.pkg.types.MyType . This shouldn't happen.
My guess is we are accidentally using Spark's serializer (Kryo) somewhere
instead of coders.

If you share your pipeline (feel free to redact anything pertaining to your
organization) it will help us locate where this issue is happening.


On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré 
wrote:

> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré 
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org 
> > http://blog.nanthrax.net 
> > Talend - http://www.talend.com 
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: guava collections and kryo under spark runner

2017-03-24 Thread Antony Mayi
I am on 0.6.0
thx,a. 

On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré  
wrote:
 

 Hi Antony,

which Beam version are you using ? We did some improvement about guava shading 
recently, wanted to check if it's related.

Regards
JB

On 03/24/2017 08:03 AM, Antony Mayi wrote:
> Hi,
>
> I am using guava's collections (immutables from 21.0) in my beam pipelines and
> when running on spark runner it fails due to kryo unable to serialize those. I
> can see there have been some approaches addressing this using
> de.javakaffee.kryo-serializers
> -> 
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
>  but
> that's been removed recently.
>
> how should I solve this?
>
> the stacktrace is bellow.
>
> thanks,
> antony.
>
>
> [WARNING]
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
> to
> stage failure: Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: 
> java.lang.UnsupportedOperationException
> Serialization trace:
> fields (my.pkg.types.MyType)
> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> at my.pkg.Main.main(Main.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> at java.lang.Thread.run(Thread.java:745)
>

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com