Hi Aviem,
Apologies for the confusion - did you see my second version of the file I sent
shortly after the first one? That second one had the Row class included (using
just "implements Serializable").
Thanks,a.
On Friday, 24 March 2017, 13:36, Aviem Zur <[email protected]> wrote:
Hi Antony,
Thanks for sharing your code!
I created a test that uses the exact pipeline. I couldn't find the `Row` class
referred to in your pipeline so I created it as a POJO and registered its coder
as `AvroCoder`.
Unfortunately this test passes so it does not reproduce the issue you are
experiencing.Please find the test in the following gist
https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
Can you try to tweak it to be more like your use case in which you hit the
exception?
On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <[email protected]> wrote:
sorry, wrong version of the file. now corrected:a.
On Friday, 24 March 2017, 13:06, Antony Mayi <[email protected]> wrote:
Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it
is. The problem appears to be related to Combine.globally(). Attached is my
demo code showing the error.
Thanks,a.
On Friday, 24 March 2017, 10:19, Aviem Zur <[email protected]> wrote:
Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's
concept of coders, so we should be using coders instead of Spark's serializer
(Specifically, in our configuration, Kryo is used as Spark's serializer).
>From your stack trace it seems that Kryo is being used to serialize your class
>my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally
>using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your
organization) it will help us locate where this issue is happening.
On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <[email protected]> wrote:
OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).
The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).
Aviem and I are taking a look on that.
Regards
JB
On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines
>> and
>> when running on spark runner it fails due to kryo unable to serialize those.
>> I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> <https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java>but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due
>> to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> [email protected] <mailto:[email protected]>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com