Ah, sorry, I read too quickly and missed that this was for Spark Streaming. In that case you will get RDDs from foreach, so I guess you want to use RDD.foreachPartition inside the call to DStream.foreach.

-Ewen

Luis Ángel Vicente Sánchez wrote:
Thank Ewen! Now I understand why I was getting the error message. It seems that foreachPartition doesn't exists as part of the DStream class :-\ I will check API docs to find other alternatives.




2014-02-02 Ewen Cheslack-Postava <[email protected]>:
If you use anything created on the driver program within functions run on workers, it needs to be serializable, but your pool of Redis connections is not. Normally, the simple way to fix this is to use the *With methods of RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to instantiate a connection to Redis on a per-partition basis.

But your logic for outputting the data doesn't look right. cntWords.foreach's function parameter would be getting each (word,count) element, *not the whole RDD*. You probably want to share a single Redis pipeline for each RDD partition, which you can accomplish with foreachPartition. It gives you an iterator over all elements in each partition. It would look something like this:

cntWords.foreachPartition(it =>

  val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
  pool.withJedisClient { client =>
    val pipeline = client.pipeline()
    it.foreach { case (word,count) => pipeline.incrBy(word,count) }
    pipeline.sync()
  }
)

Of course you wouldn't actually need the pool in that case, but I'm not familiar with the Jedis library so I'm not sure how you'd create just a single connection.

-Ewen
February 2, 2014 at 7:18 AM
I'm trying to create a simple twitter word counter with spark-streaming and I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.



Reply via email to