[ 
https://issues.apache.org/jira/browse/SPARK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14519005#comment-14519005
 ] 

Nicolas PHUNG commented on SPARK-3601:
--------------------------------------

For GenericData.Array Avro, I use the following snippet from 
[Flink|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java]:

{code}
// Avoid issue with avro array serialization 
https://issues.apache.org/jira/browse/FLINK-1391
public class Serializers {
    /**
     * Special serializer for Java collections enforcing certain instance types.
     * Avro is serializing collections with an "GenericData.Array" type. Kryo 
is not able to handle
     * this type, so we use ArrayLists.
     */
    public static class SpecificInstanceCollectionSerializer<T extends 
java.util.ArrayList<?>> extends CollectionSerializer implements Serializable {
        private Class<T> type;

        public SpecificInstanceCollectionSerializer(Class<T> type) {
            this.type = type;
        }

        @Override
        protected Collection create(Kryo kryo, Input input, Class<Collection> 
type) {
            return kryo.newInstance(this.type);
        }

        @Override
        protected Collection createCopy(Kryo kryo, Collection original) {
            return kryo.newInstance(this.type);
        }
    }
}
{code}

And I have register in Kryo with the following scala code :

{code}
kryo.register(classOf[GenericData.Array[_]], new 
SpecificInstanceCollectionSerializer(classOf[java.util.ArrayList[_]]));
{code}

I hope this help.

> Kryo NPE for output operations on Avro complex Objects even after registering.
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-3601
>                 URL: https://issues.apache.org/jira/browse/SPARK-3601
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: local, standalone cluster
>            Reporter: mohan gaddam
>
> Kryo serializer works well when avro objects has simple data. but when the 
> same avro object has complex data(like unions/arrays) kryo fails while output 
> operations. but mappings are good. Note that i have registered all the Avro 
> generated classes with kryo. Im using Java as programming language.
> when used complex message throws NPE, stack trace as follows:
> ==================================================
> ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 
> ms.0 
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
> while getting task result: com.esotericsoftware.kryo.KryoException: 
> java.lang.NullPointerException 
> Serialization trace: 
> value (xyz.Datum) 
> data (xyz.ResMsg) 
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>  
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>  
> at scala.Option.foreach(Option.scala:236) 
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>  
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>  
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
> at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
> at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> In the above exception, Datum and ResMsg are project specific classes 
> generated by avro using the below avdl snippet:
> ======================
> record KeyValueObject { 
>         union{boolean, int, long, float, double, bytes, string} name; 
>         union {boolean, int, long, float, double, bytes, string, 
> array<union{boolean, int, long, float, double, bytes, string, 
> KeyValueObject}>, KeyValueObject} value; 
> } 
> record Datum { 
>         union {boolean, int, long, float, double, bytes, string, 
> array<union{boolean, int, long, float, double, bytes, string, 
> KeyValueObject}>, KeyValueObject} value; 
> } 
> record ResMsg { 
>                 string version; 
>                 string sequence; 
>                 string resourceGUID; 
>                 string GWID; 
>                 string GWTimestamp; 
>                 union {Datum, array<Datum>} data; 
> }
> avro message samples are as follows:
> ============================
> 1)
> {"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", 
> "GWTimestamp": "1409823150737", "data": {"value": "30"}} 
> 2)
> {"version": "01", "sequence": "00001", "resource": "sensor-001", 
> "controller": "002", "controllerTimestamp": "1411038710358", "data": 
> {"value": [ {"name": "Temperature", "value": "30"}, {"name": "Speed", 
> "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, 
> {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}
> both 1 and 2 adhere to the avro schema, so decoder is able to convert them 
> into avro objects in spark streaming api.
> BTW the messages were pulled from kafka source, and decoded by using kafka 
> decoder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to