Does this help? I can’t figure out anything new from this extra information.

Thanks,
Mahesh

2014-08-07 12:27:00,170 [spark-akka.actor.default-dispatcher-4] ERROR 
akka.actor.OneForOneStrategy - org.apache.spark.streaming.StreamingContext
        - field (class 
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1", name: 
"ssc$1", type: "class org.apache.spark.streaming.StreamingContext")
        - object (class 
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1", 
<function1>)
        - field (class 
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", name: 
"foreachFunc$1", type: "interface scala.Function1")
        - object (class 
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", <function2>)
        - field (class "org.apache.spark.streaming.dstream.ForEachDStream", 
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: 
"interface scala.Function2")
        - object (class "org.apache.spark.streaming.dstream.ForEachDStream", 
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80)
        - element of array (index: 1)
        - array (class "[Ljava.lang.Object;", size: 16)
        - field (class "scala.collection.mutable.ArrayBuffer", name: "array", 
type: "class [Ljava.lang.Object;")
        - object (class "scala.collection.mutable.ArrayBuffer", 
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@7129162d, 
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80))
        - field (class "org.apache.spark.streaming.DStreamGraph", name: 
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")
        - custom writeObject data (class 
"org.apache.spark.streaming.DStreamGraph")
        - object (class "org.apache.spark.streaming.DStreamGraph", 
org.apache.spark.streaming.DStreamGraph@23e0520a)
        - field (class "org.apache.spark.streaming.Checkpoint", name: "graph", 
type: "class org.apache.spark.streaming.DStreamGraph")
        - root object (class "org.apache.spark.streaming.Checkpoint", 
org.apache.spark.streaming.Checkpoint@73a68f0)
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
        - field (class 
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1", name: 
"ssc$1", type: "class org.apache.spark.streaming.StreamingContext")
        - object (class 
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1", 
<function1>)
        - field (class 
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", name: 
"foreachFunc$1", type: "interface scala.Function1")
        - object (class 
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", <function2>)
        - field (class "org.apache.spark.streaming.dstream.ForEachDStream", 
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: 
"interface scala.Function2")
        - object (class "org.apache.spark.streaming.dstream.ForEachDStream", 
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80)
        - element of array (index: 1)
        - array (class "[Ljava.lang.Object;", size: 16)
        - field (class "scala.collection.mutable.ArrayBuffer", name: "array", 
type: "class [Ljava.lang.Object;")
        - object (class "scala.collection.mutable.ArrayBuffer", 
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@7129162d, 
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80))
        - field (class "org.apache.spark.streaming.DStreamGraph", name: 
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")
        - custom writeObject data (class 
"org.apache.spark.streaming.DStreamGraph")
        - object (class "org.apache.spark.streaming.DStreamGraph", 
org.apache.spark.streaming.DStreamGraph@23e0520a)
        - field (class "org.apache.spark.streaming.Checkpoint", name: "graph", 
type: "class org.apache.spark.streaming.DStreamGraph")
        - root object (class "org.apache.spark.streaming.Checkpoint", 
org.apache.spark.streaming.Checkpoint@73a68f0)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)

From: Tathagata Das 
<tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>>
Date: Thursday, August 7, 2014 at 11:31 AM
To: Mahesh Padmanabhan 
<mahesh.padmanab...@twc-contractor.com<mailto:mahesh.padmanab...@twc-contractor.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head scratcher)

Can you enable the java flag -Dsun.io.serialization.extendedDebugInfo=true  for 
driver in your driver startup-script? That should give an indication of the 
sequence of object references that lead to the StremaingContext being included 
in the closure.

TD


On Thu, Aug 7, 2014 at 10:23 AM, Padmanabhan, Mahesh (contractor) 
<mahesh.padmanab...@twc-contractor.com<mailto:mahesh.padmanab...@twc-contractor.com>>
 wrote:
Thanks TD but unfortunately that did not work.

From: Tathagata Das 
<tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>>
Date: Thursday, August 7, 2014 at 10:55 AM
To: Mahesh Padmanabhan 
<mahesh.padmanab...@twc-contractor.com<mailto:mahesh.padmanab...@twc-contractor.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head scratcher)

It could be because of the variable "enableOpStat". Since its defined outside 
foreachRDD, referring to it inside the rdd.foreach is probably causing the 
whole streaming context being included in the closure. Scala funkiness. Try 
this, see if it works.

msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
       val enable = enableOpStat

        rdd.foreach(item => item match {
          case (key, (oc, dc)) => {
            DebugLogger.log("Original event count = " + oc)
            DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
            if (enable) {
              try {
                val statBody = Array(("batchCount", oc.toString()),
                  ("duplicateCount", (oc-dc).toString()))
                OperationalStatProducer.produce(statBody)
              } catch { case e: Exception => DebugLogger.report(e) }
            }
          }
        })
      })



On Thu, Aug 7, 2014 at 9:03 AM, Padmanabhan, Mahesh (contractor) 
<mahesh.padmanab...@twc-contractor.com<mailto:mahesh.padmanab...@twc-contractor.com>>
 wrote:
Hello all,

I am not sure what is going on – I am getting a NotSerializedException and 
initially I thought it was due to not registering one of my classes with Kryo 
but that doesn’t seem to be the case. I am essentially eliminating duplicates 
in a spark streaming application by using a “window” to eliminate duplicates 
from the current batch and sending de-dup stats.

The strange thing is that my application is not affected by this error at all.

I tried registering with Kryo like this (though it was more out of desperation):

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Long])
    kryo.register(classOf[Tuple2[Int, Tuple2[Long, Long]]])
  }
}

Immediately before the exception, I have a piece of code that seems to be 
executed (at rdd.foreach) though I am not sure if that is the culprit:

Here is the code:

    msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
        rdd.foreach(item => item match {
          case (key, (oc, dc)) => {
            DebugLogger.log("Original event count = " + oc)
            DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
            if (enableOpStat) {
              try {
                val statBody = Array(("batchCount", oc.toString()),
                  ("duplicateCount", (oc-dc).toString()))
                OperationalStatProducer.produce(statBody)
              } catch { case e: Exception => DebugLogger.report(e) }
            }
          }
        })
      })

Here is the exception:

ERROR akka.actor.OneForOneStrategy - org.apache.spark.streaming.StreamingContext
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org<http://org.apache.spark.streaming.scheduler.JobGenerator.org>$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

________________________________
This E-mail and any of its attachments may contain Time Warner Cable 
proprietary information, which is privileged, confidential, or subject to 
copyright belonging to Time Warner Cable. This E-mail is intended solely for 
the use of the individual or entity to which it is addressed. If you are not 
the intended recipient of this E-mail, you are hereby notified that any 
dissemination, distribution, copying, or action taken in relation to the 
contents of and attachments to this E-mail is strictly prohibited and may be 
unlawful. If you have received this E-mail in error, please notify the sender 
immediately and permanently delete the original and any copy of this E-mail and 
any printout.


Reply via email to