Well I dont see the rdd in the foreachRDD being passed into the A.func1()
so I am not sure what is purpose of the function. Assuming that you do want
to pass on that RDD into that function, and also want to have access to the
sparkContext, you can only pass on the RDD and then access the associated
context as rdd.context. This will avoid referring of the ssc  inside the
foreachRDD.

See if that helps.

TD


On Thu, Aug 7, 2014 at 12:47 PM, Padmanabhan, Mahesh (contractor) <
mahesh.padmanab...@twc-contractor.com> wrote:

>  Thanks TD, Amit.
>
>  I think I figured out where the problem is through the process of
> commenting out individual lines of code one at a time :(
>
>  Can either of you help me find the right solution? I tried creating the
> SparkContext outside the foreachRDD but that didn’t help.
>
>  I have an object (let’s say A) that is passed a SparkContext like this:
>
>  object A {
>   def func1(sc: SparkContext) {
>     //Do something with sc
> }
>
>  In my main object that creates the StreamingContext, I call object A’s
> func1 method like this:
>
>  val ssc = new StreamingContext(spark, Seconds(batchTime))
>
>  ssc.checkpoint(checkPointDir)
>
>  val messageStream = KafkaConsumer.messageStream(ssc)
>
>  messageStream.foreachRDD(rdd => {
>    A.func1(ssc.sparkContext)
> }
>
>  Seems like the call A.func1(ssc.sparkContext) above is the cause of the
> exception.
>
>  Thanks,
> Mahesh
>
>   From: Tathagata Das <tathagata.das1...@gmail.com>
> Date: Thursday, August 7, 2014 at 1:11 PM
> To: amit <amit.codenam...@gmail.com>
> Cc: "u...@spark.incubator.apache.org" <u...@spark.incubator.apache.org>
>
> Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head
> scratcher)
>
>  From the extended info, I see that you have a function called
> createStreamingContext() in your code. Somehow that is getting referenced
> in in the foreach function. Is the whole foreachRDD code inside the
> createStreamingContext() function? Did you try marking the ssc field as
> transient?
>
>  Here is a significantly different approach. Put the whole function to
> apply on each item in an object.
>
>  object MyFunctions {
>   def processItem(enable: Boolean)(item: (Int, (Long, Long)) = {
>      val (key, (oc, dc)) = item
>             DebugLogger.log("Original event count = " + oc)
>             DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+"
> events")
>             if (enableOpStat) {
>               try {
>                 val statBody = Array(("batchCount", oc.toString()),
>                   ("duplicateCount", (oc-dc).toString()))
>                 OperationalStatProducer.produce(statBody)
>               } catch { case e: Exception => DebugLogger.report(e) }
>             }
>           }
>   }
> }
>
>
>
>  And then use that
>
>  msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
>         val enable = enableOptStat
>         rdd.foreach(MyFunction.processItem(enable) _ )
> }
>
>
>
> On Thu, Aug 7, 2014 at 11:52 AM, amit <amit.codenam...@gmail.com> wrote:
>
>> There is one more configuration option called spark.closure.serializer
>> that
>> can be used to specify serializer for closures.
>>
>> Maybe in the the class you have Streaming Context as a field, so when
>> spark
>> tries to serialize the whole class it uses the spark.closure.serializer to
>> serialize even the streaming context. Classes like StreamingContext may
>> not
>> work if serialized and deserialized in a different JVM(?).
>>
>> So I see two solutions one is to somehow avoid serializing
>> StreamingContext,
>> other is to override the default serialization method to serialize only
>> the
>> params required by streaming context and recreate it in the serialization
>> step from the params
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-NotSerialized-exception-a-bit-of-a-head-scratcher-tp11666p11703.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
> ------------------------------
> This E-mail and any of its attachments may contain Time Warner Cable
> proprietary information, which is privileged, confidential, or subject to
> copyright belonging to Time Warner Cable. This E-mail is intended solely
> for the use of the individual or entity to which it is addressed. If you
> are not the intended recipient of this E-mail, you are hereby notified that
> any dissemination, distribution, copying, or action taken in relation to
> the contents of and attachments to this E-mail is strictly prohibited and
> may be unlawful. If you have received this E-mail in error, please notify
> the sender immediately and permanently delete the original and any copy of
> this E-mail and any printout.
>

Reply via email to