Hm, now I am also seeing this problem.

The essence of my code is:

    final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

    JavaStreamingContextFactory streamingContextFactory = new
JavaStreamingContextFactory() {
      @Override
      public JavaStreamingContext create() {
        return new JavaStreamingContext(sparkContext, new
Duration(batchDurationMS));
      }
    };

      streamingContext = JavaStreamingContext.getOrCreate(
          checkpointDirString, sparkContext.hadoopConfiguration(),
streamingContextFactory, false);
      streamingContext.checkpoint(checkpointDirString);

yields:

2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
org.apache.hadoop.conf.Configuration

- field (class 
"org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
name: "conf$2", type: "class org.apache.hadoop.conf.Configuration")

- object (class
"org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
<function2>)

- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
type: "interface scala.Function2")

- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
...


This looks like it's due to PairRDDFunctions, as this saveFunc seems
to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
:

def saveAsNewAPIHadoopFiles(
    prefix: String,
    suffix: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
    conf: Configuration = new Configuration
  ) {
  val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
    val file = rddToFileName(prefix, suffix, time)
    rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
outputFormatClass, conf)
  }
  self.foreachRDD(saveFunc)
}


conf indeed is serialized to make it part of saveFunc, no? but it
can't be serialized.
But surely this doesn't fail all the time or someone would have
noticed by now...

It could be a particular closure problem again.

Any ideas on whether this is a problem, or if there's a workaround?
checkpointing does not work at all for me as a result.

On Fri, Aug 15, 2014 at 10:37 PM, salemi <alireza.sal...@udo.edu> wrote:
> Hi All,
>
> I am just trying to save the kafka dstream to hadoop as followed
>
>       val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
>       dStream.saveAsHadoopFiles(hdfsDataUrl, "data")
>
> It throws the following exception. What am I doing wrong?
>
> 14/08/15 14:30:09 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf
> java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at
> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ^C14/08/15 14:30:10 ERROR OneForOneStrategy:
> org.apache.hadoop.mapred.JobConf
> java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at
> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to