I'm trying to create a simple twitter word counter with spark-streaming and
I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1]
[filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status => status.getText.toLowerCase.split("
")).map(word => (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd =>
      pool.withJedisClient { client =>
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) =>
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but
related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.

Attachment: build.sbt
Description: Binary data

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Reply via email to