Thank Ewen! Now I understand why I was getting the error message. It seems
that foreachPartition doesn't exists as part of the DStream class :-\ I
will check API docs to find other alternatives.




2014-02-02 Ewen Cheslack-Postava <[email protected]>:

> If you use anything created on the driver program within functions run on
> workers, it needs to be serializable, but your pool of Redis connections is
> not. Normally, the simple way to fix this is to use the *With methods of
> RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to
> instantiate a connection to Redis on a per-partition basis.
>
> But your logic for outputting the data doesn't look right.
> cntWords.foreach's function parameter would be getting each (word,count)
> element, *not the whole RDD*. You probably want to share a single Redis
> pipeline for each RDD partition, which you can accomplish with
> foreachPartition. It gives you an iterator over all elements in each
> partition. It would look something like this:
>
> cntWords.foreachPartition(it =>
>
>   val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
> 6379, 2000))
>   pool.withJedisClient { client =>
>     val pipeline = client.pipeline()
>     it.foreach { case (word,count) => pipeline.incrBy(word,count) }
>     pipeline.sync()
>   }
> )
>
> Of course you wouldn't actually need the pool in that case, but I'm not
> familiar with the Jedis library so I'm not sure how you'd create just a
> single connection.
>
> -Ewen
>
>   Luis Ángel Vicente Sánchez <[email protected]>
>  February 2, 2014 at 7:18 AM
> I'm trying to create a simple twitter word counter with spark-streaming
> and I would like to store the word counts in redis. The program looks like
> this:
>
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.{ Seconds, StreamingContext }
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.twitter._
> import org.sedis._
> import redis.clients.jedis._
>
> object TwitterWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 1) {
>       System.err.println("Usage: TwitterWordCount <master> [filter1]
> [filter2] ... [filterN]")
>       System.exit(1)
>     }
>
>     val (master, filters) = (args.head, args.tail)
>
>     val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
> 6379, 2000))
>     val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
> System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
>     val stream = TwitterUtils.createStream(ssc, None, filters,
> StorageLevel.MEMORY_ONLY_SER)
>
>     val words = stream.flatMap(status => status.getText.toLowerCase.split("
> ")).map(word => (word, 1l))
>
>     val cntWords = words.reduceByKey(_ + _)
>
>     cntWords.foreach(rdd =>
>       pool.withJedisClient { client =>
>         val pipeline = client.pipelined()
>         rdd.foreach {
>           case (word, count) =>
>             pipeline.incrBy(word, count)
>         }
>         pipeline.sync()
>       }
>     )
>
>     ssc.start()
>   }
> }
>
> Everytime I run this program, I get this error:
>
> [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
> streaming job 1391354180000 ms.0
> [error] org.apache.spark.SparkException: Job aborted: Task not
> serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> [error] at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [error] at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [error] at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> [error] at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
> [error] at scala.Option.foreach(Option.scala:236)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
> [error] at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [error] at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [error] at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
> [error] at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> [error] at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> [error] at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [error] at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [error] at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [error] at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I have tried to not use redis pipelines and then I get the same error but
> related to the Jedis client.
>
> Have anybody done something similar?
>
> Kind regards,
>
> Luis
>
> P.S. I have attached my build.sbt and the scala source code to this file.
>
>
>

<<compose-unknown-contact.jpg>>

Reply via email to