It could be because of the variable "enableOpStat". Since its defined
outside foreachRDD, referring to it inside the rdd.foreach is probably
causing the whole streaming context being included in the closure. Scala
funkiness. Try this, see if it works.

msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
       *val enable = enableOpStat*

        rdd.foreach(item => item match {
          case (key, (oc, dc)) => {
            DebugLogger.log("Original event count = " + oc)
            DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+"
events")
            if (*enable*) {
              try {
                val statBody = Array(("batchCount", oc.toString()),
                  ("duplicateCount", (oc-dc).toString()))
                OperationalStatProducer.produce(statBody)
              } catch { case e: Exception => DebugLogger.report(e) }
            }
          }
        })
      })



On Thu, Aug 7, 2014 at 9:03 AM, Padmanabhan, Mahesh (contractor) <
mahesh.padmanab...@twc-contractor.com> wrote:

>  Hello all,
>
>  I am not sure what is going on – I am getting a NotSerializedException
> and initially I thought it was due to not registering one of my classes
> with Kryo but that doesn’t seem to be the case. I am essentially
> eliminating duplicates in a spark streaming application by using a “window”
> to eliminate duplicates from the current batch and sending de-dup stats.
>
>  The strange thing is that my application is not affected by this error
> at all.
>
>  I tried registering with Kryo like this (though it was more out of
> desperation):
>
>  class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[Long])
>     kryo.register(classOf[Tuple2[Int, Tuple2[Long, Long]]])
>   }
> }
>
>  Immediately before the exception, I have a piece of code that seems to
> be executed (at rdd.foreach) though I am not sure if that is the culprit:
>
>  Here is the code:
>
>      msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) =>
> {
>         rdd.foreach(item => item match {
>           case (key, (oc, dc)) => {
>             DebugLogger.log("Original event count = " + oc)
>             DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+"
> events")
>             if (enableOpStat) {
>               try {
>                 val statBody = Array(("batchCount", oc.toString()),
>                   ("duplicateCount", (oc-dc).toString()))
>                 OperationalStatProducer.produce(statBody)
>               } catch { case e: Exception => DebugLogger.report(e) }
>             }
>           }
>         })
>       })
>
>  Here is the exception:
>
>  ERROR akka.actor.OneForOneStrategy -
> org.apache.spark.streaming.StreamingContext
> java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at
> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
>         at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ------------------------------
> This E-mail and any of its attachments may contain Time Warner Cable
> proprietary information, which is privileged, confidential, or subject to
> copyright belonging to Time Warner Cable. This E-mail is intended solely
> for the use of the individual or entity to which it is addressed. If you
> are not the intended recipient of this E-mail, you are hereby notified that
> any dissemination, distribution, copying, or action taken in relation to
> the contents of and attachments to this E-mail is strictly prohibited and
> may be unlawful. If you have received this E-mail in error, please notify
> the sender immediately and permanently delete the original and any copy of
> this E-mail and any printout.
>

Reply via email to