I'm trying to
create a simple twitter word counter with spark-streaming and I would
like to store the word counts in redis. The program looks like this:
import
org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import
org.apache.spark.streaming.StreamingContext._
import
org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._
object
TwitterWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage:
TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
System.exit(1)
}
val (master, filters) = (args.head, args.tail)
val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
6379, 2000))
val ssc = new StreamingContext(master, "TwitterWordCount",
Seconds(5), System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
val stream =
TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER)
val words = stream.flatMap(status ⇒
status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))
val cntWords = words.reduceByKey(_ + _)
cntWords.foreach(rdd ⇒
pool.withJedisClient {
client ⇒
val pipeline = client.pipelined()
rdd.foreach {
case (word, count) ⇒
pipeline.incrBy(word, count)
}
pipeline.sync()
}
)
ssc.start()
}
}
Everytime
I run this program, I get this error:
[error] 14/02/02 15:16:20 ERROR
scheduler.JobScheduler: Error running job streaming job 1391354180000
ms.0
[error] org.apache.spark.SparkException: Job aborted:
Task not serializable: java.io.NotSerializableException:
redis.clients.jedis.Pipeline
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error]
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error]
at
scala.Option.foreach(Option.scala:236)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error]
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error]
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error]
at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at
akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at
akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error]
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error]
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I have tried to not use redis pipelines and
then I get the same error but related to the Jedis client.
Have
anybody done something similar?
Kind regards,
Luis
P.S. I have attached
my build.sbt and the scala source code to this file.