I'm getting the below NotSerializableException despite using Kryo to
serialize that class (TileIdWritable).

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried
running operations over awtestRDD which force a shuffle (e.g., groupByKey),
and that seemed to work fine. So it seems to be specific to the
"TileIdWritable(200)" argument to lookup().  Is there anything unique about
companion objects and Kryo serialization? I even replaced
"TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable {
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during
shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator",
"geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed:
Task not serializable: java.io.NotSerializableException:
geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
<function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name:
"func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
<function2>)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet

Reply via email to