GitHub user uncleGen opened a pull request:

    https://github.com/apache/spark/pull/15992

    [SPARK-18560][CORE][STREAMING] Receiver data can not be dataSerialized 
properly.

    ## What changes were proposed in this pull request?
    
    My spark streaming job can run correctly on Spark 1.6.1, but it can not run 
properly on Spark 2.0.1, with following exception:
    
    `
    16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 
6.0 (TID 87)
    com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
13994
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
        at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    `
    
    Go deep into  relevant implementation, I find the type of data received by 
{{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate 
{{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the type 
of data. 
    
    At the {{Receiver}} side, the type of data is erased to be {{Object}}, so 
framework will choose {{JavaSerializer}}, with following code:
    
    `
    def canUseKryo(ct: ClassTag[_]): Boolean = {
        primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
      }
    
      def getSerializer(ct: ClassTag[_]): Serializer = {
        if (canUseKryo(ct)) {
          kryoSerializer
        } else {
          defaultSerializer
        }
      }
    `
    
    At task side, we can get correct data type, and framework will choose 
{{KryoSerializer}} if possible, with following supported type:
    
    `
    private[this] val stringClassTag: ClassTag[String] = 
implicitly[ClassTag[String]]
    private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
        val primitiveClassTags = Set[ClassTag[_]](
          ClassTag.Boolean,
          ClassTag.Byte,
          ClassTag.Char,
          ClassTag.Double,
          ClassTag.Float,
          ClassTag.Int,
          ClassTag.Long,
          ClassTag.Null,
          ClassTag.Short
        )
        val arrayClassTags = primitiveClassTags.map(_.wrap)
        primitiveClassTags ++ arrayClassTags
      }
    `
    
    In my case, the type of data is Byte Array.
    
    This problem stems from #SPARK-13990, a patch to have Spark automatically 
pick the "best" serializer when caching RDDs.
    ## How was this patch tested?
    
    update existing unit test


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uncleGen/spark SPARK-18560

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15992
    
----
commit 0c92493819276b7e1b83cdd3d6bfa6631d3dbb89
Author: uncleGen <[email protected]>
Date:   2016-11-22T14:36:23Z

    SPARK-18560: Receiver data can not be dataSerialized properly.

commit 0a68047a20adfbb8252b4da2e46aa84b8631a48b
Author: uncleGen <[email protected]>
Date:   2016-11-23T09:32:14Z

    cp

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to