Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it 
is. The problem appears to be related to Combine.globally(). Attached is my 
demo code showing the error.
Thanks,a. 

    On Friday, 24 March 2017, 10:19, Aviem Zur <[email protected]> wrote:
 

 Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's 
concept of coders, so we should be using coders instead of Spark's serializer 
(Specifically, in our configuration, Kryo is used as Spark's serializer).
>From your stack trace it seems that Kryo is being used to serialize your class 
>my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally 
>using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your 
organization) it will help us locate where this issue is happening.

On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <[email protected]> wrote:

OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <[email protected]> 
> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines 
>> and
>> when running on spark runner it fails due to kryo unable to serialize those. 
>> I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> <https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java>but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
>> to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: 
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> [email protected] <mailto:[email protected]>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com



   
package amayi;

import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class KryoTest {
    public static class CoalesceFn<InputT> extends Combine.CombineFn<InputT, List<InputT>, List<InputT>> {
        @Override public List<InputT> createAccumulator() {
            return new ArrayList<>();
        }

        @Override public List<InputT> addInput(List<InputT> accumulator, InputT item) {
            accumulator.add(item);
            return accumulator;
        }

        @Override public List<InputT> mergeAccumulators(Iterable<List<InputT>> accumulators) {
            Iterator<List<InputT>> iter = accumulators.iterator();
            List<InputT> merged = iter.next();
            while (iter.hasNext()) {
                merged.addAll(iter.next());
            }
            return merged;
        }

        @Override public List<InputT> extractOutput(List<InputT> accumulator) {
            return accumulator;
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
        options.setRunner(SparkRunner.class);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply(Create.of(new Row("A"), new Row("B")))
                .apply(Combine.globally(new CoalesceFn<Row>()).asSingletonView());
        pipeline.run().waitUntilFinish();
    }
}

Reply via email to