there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)
that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup. Those closures include referenced variables, like your TileIdWritable. So you need to either change that to use kryo, or make your object serializable to java. On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[email protected]> wrote: > > I'm getting the below NotSerializableException despite using Kryo to > serialize that class (TileIdWritable). > > The offending line: awtestRdd.lookup(TileIdWritable(200)) > > Initially I thought Kryo is not being registered properly, so I tried > running operations over awtestRDD which force a shuffle (e.g., groupByKey), > and that seemed to work fine. So it seems to be specific to the > "TileIdWritable(200)" argument to lookup(). Is there anything unique about > companion objects and Kryo serialization? I even replaced > "TileIdWritable(200)" by "new TileIdWritable" but still see that exception > > > class TileIdWritable { > // > } > > object TileIdWritable { > def apply(value: Long) = new TileIdWritable > } > > > My Kryo registrator: > class KryoRegistrator extends SparkKryoRegistrator { > override def registerClasses(kryo: Kryo) { > println("Called KryoRegistrator") // I see this printed during > shuffle operations > val r = kryo.register(classOf[TileIdWritable]) > val s = kryo.register(classOf[ArgWritable]) > } > } > > Then just before creating a Spark Context, I have these two lines: > System.setProperty("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > System.setProperty("spark.kryo.registrator", > "geotrellis.spark.KryoRegistrator") > > > > > The exception itself: > Exception in thread "main" org.apache.spark.SparkException: Job failed: > Task not serializable: java.io.NotSerializableException: > geotrellis.spark.formats.TileIdWritable > - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", > name: "key$1", type: "class java.lang.Object") > - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", > <function1>) > - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", > name: "func$1", type: "interface scala.Function1") > - root object (class > "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > > Regards, > Ameet > > > > >
