Hm, now I am also seeing this problem. The essence of my code is:
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaStreamingContextFactory streamingContextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return new JavaStreamingContext(sparkContext, new Duration(batchDurationMS)); } }; streamingContext = JavaStreamingContext.getOrCreate( checkpointDirString, sparkContext.hadoopConfiguration(), streamingContextFactory, false); streamingContext.checkpoint(checkpointDirString); yields: 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66 org.apache.hadoop.conf.Configuration - field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", name: "conf$2", type: "class org.apache.hadoop.conf.Configuration") - object (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", <function2>) - field (class "org.apache.spark.streaming.dstream.ForEachDStream", name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: "interface scala.Function2") - object (class "org.apache.spark.streaming.dstream.ForEachDStream", org.apache.spark.streaming.dstream.ForEachDStream@cb8016a) ... This looks like it's due to PairRDDFunctions, as this saveFunc seems to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9 : def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = new Configuration ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } self.foreachRDD(saveFunc) } conf indeed is serialized to make it part of saveFunc, no? but it can't be serialized. But surely this doesn't fail all the time or someone would have noticed by now... It could be a particular closure problem again. Any ideas on whether this is a problem, or if there's a workaround? checkpointing does not work at all for me as a result. On Fri, Aug 15, 2014 at 10:37 PM, salemi <alireza.sal...@udo.edu> wrote: > Hi All, > > I am just trying to save the kafka dstream to hadoop as followed > > val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) > dStream.saveAsHadoopFiles(hdfsDataUrl, "data") > > It throws the following exception. What am I doing wrong? > > 14/08/15 14:30:09 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf > java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) > at > org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ^C14/08/15 14:30:10 ERROR OneForOneStrategy: > org.apache.hadoop.mapred.JobConf > java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) > at > org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org