Hi Antony, Thanks for sharing your code!
I created a test that uses the exact pipeline. I couldn't find the `Row` class referred to in your pipeline so I created it as a POJO and registered its coder as `AvroCoder`. Unfortunately this test passes so it does not reproduce the issue you are experiencing. Please find the test in the following gist https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12 Can you try to tweak it to be more like your use case in which you hit the exception? On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <antonym...@yahoo.com> wrote: > sorry, wrong version of the file. now corrected: > a. > > > On Friday, 24 March 2017, 13:06, Antony Mayi <antonym...@yahoo.com> wrote: > > > Hi Aviem, > > it took me a while to narrow it down to a simple reproducible case but > here it is. The problem appears to be related to Combine.globally(). > Attached is my demo code showing the error. > > Thanks, > a. > > > On Friday, 24 March 2017, 10:19, Aviem Zur <aviem...@gmail.com> wrote: > > > Hi Antony. > > Spark uses serializers to serialize data, however this clashes with Beam's > concept of coders, so we should be using coders instead of Spark's > serializer (Specifically, in our configuration, Kryo is used as Spark's > serializer). > > From your stack trace it seems that Kryo is being used to serialize your > class my.pkg.types.MyType . This shouldn't happen. > My guess is we are accidentally using Spark's serializer (Kryo) somewhere > instead of coders. > > If you share your pipeline (feel free to redact anything pertaining to > your organization) it will help us locate where this issue is happening. > > > On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > OK, discussing with Aviem, the problem is that Kryo is not able to > serialize > Guava collections (it's a known issue). > > The question is why Kryo wants to serialize the collections (it could be > related > to a change in the Windowing code). > > Aviem and I are taking a look on that. > > Regards > JB > > On 03/24/2017 09:10 AM, Antony Mayi wrote: > > I am on 0.6.0 > > > > thx, > > a. > > > > > > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > > > > Hi Antony, > > > > which Beam version are you using ? We did some improvement about guava > shading > > recently, wanted to check if it's related. > > > > Regards > > JB > > > > On 03/24/2017 08:03 AM, Antony Mayi wrote: > >> Hi, > >> > >> I am using guava's collections (immutables from 21.0) in my beam > pipelines and > >> when running on spark runner it fails due to kryo unable to serialize > those. I > >> can see there have been some approaches addressing this using > >> de.javakaffee.kryo-serializers > >> -> > > > https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java > > < > https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java > >but > >> that's been removed recently. > >> > >> how should I solve this? > >> > >> the stacktrace is bellow. > >> > >> thanks, > >> antony. > >> > >> > >> [WARNING] > >> java.lang.RuntimeException: org.apache.spark.SparkException: Job > aborted due to > >> stage failure: Exception while getting task result: > >> com.esotericsoftware.kryo.KryoException: > java.lang.UnsupportedOperationException > >> Serialization trace: > >> fields (my.pkg.types.MyType) > >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow) > >> at > >> > > > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > >> at > >> > > > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > >> at > >> > > > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > >> at > >> > > > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102) > >> at my.pkg.Main.main(Main.java:33) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> at > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:498) > >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282) > >> at java.lang.Thread.run(Thread.java:745) > > > >> > > > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org <mailto:jbono...@apache.org> > > http://blog.nanthrax.net <http://blog.nanthrax.net/> > > Talend - http://www.talend.com <http://www.talend.com/> > > > > > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com > > > > > >