Yep, here goes! Here are my environment vitals:
- Spark 1.0.0 - EC2 cluster with 1 slave spun up using spark-ec2 - twitter4j 3.0.3 - spark-shell called with --jars argument to load spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j jars. Now, while I’m in the Spark shell, I enter the following: import twitter4j.auth.{Authorization, OAuthAuthorization} import twitter4j.conf.ConfigurationBuilder import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext} import org.apache.spark.streaming.twitter.TwitterUtils def getAuth(): Option[Authorization] = { System.setProperty("twitter4j.oauth.consumerKey", "consumerKey") System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret") System.setProperty("twitter4j.oauth.accessToken", "accessToken") System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret") Some(new OAuthAuthorization(new ConfigurationBuilder().build())) } def noop(a: Any): Any = { a } val ssc = new StreamingContext(sc, Seconds(5)) val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth()) val liveTweets = liveTweetObjects.map(_.getText) liveTweets.map(t => noop(t)).print() ssc.start() So basically, I’m just printing Tweets as-is, but first I’m mapping them to themselves via noop(). The Tweets will start to flow just fine for a minute or so, and then, this: 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job 1406243610000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The time-to-first-error is variable. This is the simplest repro I can show at this time. Doing more complex things with liveTweets that involve a KMeansModel, for example, will be interrupted quicker by this java.io.NotSerializableException. I don’t know if the root cause is the same, but the error certainly is. By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error, but I can’t dig deeper to make sure this is really resolved (e.g. by trying more complex things that need data) due to SPARK-2471 <https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue has been resolved, so I’ll try this whole process again using the latest from master and see how it goes. Nick On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: I am very curious though. Can you post a concise code example which we can > run to reproduce this problem? > > TD > > > On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> I am not entire sure off the top of my head. But a possible (usually >> works) workaround is to define the function as a val instead of a def. For >> example >> >> def func(i: Int): Boolean = { true } >> >> can be written as >> >> val func = (i: Int) => { true } >> >> Hope this helps for now. >> >> TD >> >> >> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> Hey Diana, >>> >>> Did you ever figure this out? >>> >>> I’m running into the same exception, except in my case the function I’m >>> calling is a KMeans model.predict(). >>> >>> In regular Spark it works, and Spark Streaming without the call to >>> model.predict() also works, but when put together I get this >>> serialization exception. I’m on 1.0.0. >>> >>> Nick >>> >>> >>> >>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dcarr...@cloudera.com> >>> wrote: >>> >>>> Hey all, trying to set up a pretty simple streaming app and getting >>>> some weird behavior. >>>> >>>> First, a non-streaming job that works fine: I'm trying to pull out >>>> lines of a log file that match a regex, for which I've set up a function: >>>> >>>> def getRequestDoc(s: String): >>>> String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull } >>>> logs=sc.textFile(logfiles) >>>> logs.map(getRequestDoc).take(10) >>>> >>>> That works, but I want to run that on the same data, but streaming, so >>>> I tried this: >>>> >>>> val logs = ssc.socketTextStream("localhost",4444) >>>> logs.map(getRequestDoc).print() >>>> ssc.start() >>>> >>>> From this code, I get: >>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job >>>> 1399545128000 ms.0 >>>> org.apache.spark.SparkException: Job aborted: Task not serializable: >>>> java.io.NotSerializableException: >>>> org.apache.spark.streaming.StreamingContext >>>> >>>> >>>> But if I do the map function inline instead of calling a separate >>>> function, it works: >>>> >>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print() >>>> >>>> So why is it able to serialize my little function in regular spark, but >>>> not in streaming? >>>> >>>> Thanks, >>>> Diana >>>> >>>> >>>> >>> >> >