Thank Ewen! Now I understand why I was getting the error message. It seems that foreachPartition doesn't exists as part of the DStream class :-\ I will check API docs to find other alternatives.
2014-02-02 Ewen Cheslack-Postava <[email protected]>: > If you use anything created on the driver program within functions run on > workers, it needs to be serializable, but your pool of Redis connections is > not. Normally, the simple way to fix this is to use the *With methods of > RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to > instantiate a connection to Redis on a per-partition basis. > > But your logic for outputting the data doesn't look right. > cntWords.foreach's function parameter would be getting each (word,count) > element, *not the whole RDD*. You probably want to share a single Redis > pipeline for each RDD partition, which you can accomplish with > foreachPartition. It gives you an iterator over all elements in each > partition. It would look something like this: > > cntWords.foreachPartition(it => > > val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", > 6379, 2000)) > pool.withJedisClient { client => > val pipeline = client.pipeline() > it.foreach { case (word,count) => pipeline.incrBy(word,count) } > pipeline.sync() > } > ) > > Of course you wouldn't actually need the pool in that case, but I'm not > familiar with the Jedis library so I'm not sure how you'd create just a > single connection. > > -Ewen > > Luis Ángel Vicente Sánchez <[email protected]> > February 2, 2014 at 7:18 AM > I'm trying to create a simple twitter word counter with spark-streaming > and I would like to store the word counts in redis. The program looks like > this: > > import org.apache.spark.storage.StorageLevel > import org.apache.spark.streaming.{ Seconds, StreamingContext } > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.streaming.twitter._ > import org.sedis._ > import redis.clients.jedis._ > > object TwitterWordCount { > def main(args: Array[String]) { > if (args.length < 1) { > System.err.println("Usage: TwitterWordCount <master> [filter1] > [filter2] ... [filterN]") > System.exit(1) > } > > val (master, filters) = (args.head, args.tail) > > val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", > 6379, 2000)) > val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), > System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) > val stream = TwitterUtils.createStream(ssc, None, filters, > StorageLevel.MEMORY_ONLY_SER) > > val words = stream.flatMap(status => status.getText.toLowerCase.split(" > ")).map(word => (word, 1l)) > > val cntWords = words.reduceByKey(_ + _) > > cntWords.foreach(rdd => > pool.withJedisClient { client => > val pipeline = client.pipelined() > rdd.foreach { > case (word, count) => > pipeline.incrBy(word, count) > } > pipeline.sync() > } > ) > > ssc.start() > } > } > > Everytime I run this program, I get this error: > > [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job > streaming job 1391354180000 ms.0 > [error] org.apache.spark.SparkException: Job aborted: Task not > serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > [error] at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > [error] at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > [error] at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > [error] at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912) > [error] at scala.Option.foreach(Option.scala:236) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911) > [error] at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > [error] at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > [error] at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911) > [error] at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616) > [error] at > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456) > [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219) > [error] at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > [error] at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [error] at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [error] at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [error] at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > I have tried to not use redis pipelines and then I get the same error but > related to the Jedis client. > > Have anybody done something similar? > > Kind regards, > > Luis > > P.S. I have attached my build.sbt and the scala source code to this file. > > >
<<compose-unknown-contact.jpg>>
