Sean Owen created SPARK-4196: -------------------------------- Summary: Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ? Key: SPARK-4196 URL: https://issues.apache.org/jira/browse/SPARK-4196 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.1.0 Reporter: Sean Owen
I am reasonably sure there is some issue here in Streaming and that I'm not missing something basic, but not 100%. I went ahead and posted it as a JIRA to track, since it's come up a few times before without resolution, and right now I can't get checkpointing to work at all. When Spark Streaming checkpointing is enabled, I see a NotSerializableException thrown for a Hadoop Configuration object, and it seems like it is not one from my user code. Before I post my particular instance see http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408135046777-12202.p...@n3.nabble.com%3E for another occurrence. I was also on customer site last week debugging an identical issue with checkpointing in a Scala-based program and they also could not enable checkpointing without hitting exactly this error. The essence of my code is: {code} final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaStreamingContextFactory streamingContextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return new JavaStreamingContext(sparkContext, new Duration(batchDurationMS)); } }; streamingContext = JavaStreamingContext.getOrCreate( checkpointDirString, sparkContext.hadoopConfiguration(), streamingContextFactory, false); streamingContext.checkpoint(checkpointDirString); {code} It yields: {code} 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66 org.apache.hadoop.conf.Configuration - field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", name: "conf$2", type: "class org.apache.hadoop.conf.Configuration") - object (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", <function2>) - field (class "org.apache.spark.streaming.dstream.ForEachDStream", name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: "interface scala.Function2") - object (class "org.apache.spark.streaming.dstream.ForEachDStream", org.apache.spark.streaming.dstream.ForEachDStream@cb8016a) ... {code} This looks like it's due to PairRDDFunctions, as this saveFunc seems to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9 : {code} def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = new Configuration ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } self.foreachRDD(saveFunc) } {code} Is that not a problem? but then I don't know how it would ever work in Spark. But then again I don't see why this is an issue and only when checkpointing is enabled. Long-shot, but I wonder if it is related to closure issues like https://issues.apache.org/jira/browse/SPARK-1866 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org