Hi Antony,

Thanks for sharing your code!

I created a test that uses the exact pipeline. I couldn't find the `Row`
class referred to in your pipeline so I created it as a POJO and registered
its coder as `AvroCoder`.

Unfortunately this test passes so it does not reproduce the issue you are
experiencing.
Please find the test in the following gist
https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12

Can you try to tweak it to be more like your use case in which you hit the
exception?

On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <antonym...@yahoo.com> wrote:

> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi <antonym...@yahoo.com> wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur <aviem...@gmail.com> wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org <mailto:jbono...@apache.org>
> > http://blog.nanthrax.net <http://blog.nanthrax.net/>
> > Talend - http://www.talend.com <http://www.talend.com/>
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>

Reply via email to