Yes, I found that after sending my response; the final program is:
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import redis.clients.jedis._
object TwitterWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: TwitterWordCount <master> [filter1]
[filter2] ... [filterN]")
System.exit(1)
}
System.setProperty("spark.cleaner.ttl", "600")
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val stream = TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER)
val words = stream.flatMap(status => status.getText.toLowerCase.split("
")).countByValue()
val countWords = words.reduceByKey(_ + _)
countWords.foreachRDD(rdd =>
rdd.foreachPartition { iterator =>
val client = new Jedis("localhost")
val pipeline = client.pipelined()
iterator.foreach {
case (word, count) =>
pipeline.incrBy(word, count)
}
pipeline.sync()
client.quit()
}
)
ssc.start()
}
}
2014-02-02 Ewen Cheslack-Postava <[email protected]>:
> Ah, sorry, I read too quickly and missed that this was for Spark
> Streaming. In that case you will get RDDs from foreach, so I guess you want
> to use RDD.foreachPartition inside the call to DStream.foreach.
>
> -Ewen
>
>
> Luis Ángel Vicente Sánchez wrote:
>
> Thank Ewen! Now I understand why I was getting the error message. It seems
> that foreachPartition doesn't exists as part of the DStream class :-\ I
> will check API docs to find other alternatives.
>
>
>
>
> 2014-02-02 Ewen Cheslack-Postava <[email protected]>:
>
>> If you use anything created on the driver program within functions run on
>> workers, it needs to be serializable, but your pool of Redis connections is
>> not. Normally, the simple way to fix this is to use the *With methods of
>> RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to
>> instantiate a connection to Redis on a per-partition basis.
>>
>> But your logic for outputting the data doesn't look right.
>> cntWords.foreach's function parameter would be getting each (word,count)
>> element, *not the whole RDD*. You probably want to share a single Redis
>> pipeline for each RDD partition, which you can accomplish with
>> foreachPartition. It gives you an iterator over all elements in each
>> partition. It would look something like this:
>>
>> cntWords.foreachPartition(it =>
>>
>> val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
>> 6379, 2000))
>> pool.withJedisClient { client =>
>> val pipeline = client.pipeline()
>> it.foreach { case (word,count) => pipeline.incrBy(word,count) }
>> pipeline.sync()
>> }
>> )
>>
>> Of course you wouldn't actually need the pool in that case, but I'm not
>> familiar with the Jedis library so I'm not sure how you'd create just a
>> single connection.
>>
>> -Ewen
>>
>> Luis Ángel Vicente Sánchez <[email protected]>
>> February 2, 2014 at 7:18 AM
>> I'm trying to create a simple twitter word counter with spark-streaming
>> and I would like to store the word counts in redis. The program looks like
>> this:
>>
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.{ Seconds, StreamingContext }
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.streaming.twitter._
>> import org.sedis._
>> import redis.clients.jedis._
>>
>> object TwitterWordCount {
>> def main(args: Array[String]) {
>> if (args.length < 1) {
>> System.err.println("Usage: TwitterWordCount <master> [filter1]
>> [filter2] ... [filterN]")
>> System.exit(1)
>> }
>>
>> val (master, filters) = (args.head, args.tail)
>>
>> val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
>> 6379, 2000))
>> val ssc = new StreamingContext(master, "TwitterWordCount",
>> Seconds(5), System.getenv("SPARK_HOME"),
>> StreamingContext.jarOfClass(this.getClass))
>> val stream = TwitterUtils.createStream(ssc, None, filters,
>> StorageLevel.MEMORY_ONLY_SER)
>>
>> val words = stream.flatMap(status =>
>> status.getText.toLowerCase.split(" ")).map(word => (word, 1l))
>>
>> val cntWords = words.reduceByKey(_ + _)
>>
>> cntWords.foreach(rdd =>
>> pool.withJedisClient { client =>
>> val pipeline = client.pipelined()
>> rdd.foreach {
>> case (word, count) =>
>> pipeline.incrBy(word, count)
>> }
>> pipeline.sync()
>> }
>> )
>>
>> ssc.start()
>> }
>> }
>>
>> Everytime I run this program, I get this error:
>>
>> [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
>> streaming job 1391354180000 ms.0
>> [error] org.apache.spark.SparkException: Job aborted: Task not
>> serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> [error] at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> [error] at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> [error] at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> [error] at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>> [error] at scala.Option.foreach(Option.scala:236)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
>> [error] at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> [error] at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> [error] at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I have tried to not use redis pipelines and then I get the same error but
>> related to the Jedis client.
>>
>> Have anybody done something similar?
>>
>> Kind regards,
>>
>> Luis
>>
>> P.S. I have attached my build.sbt and the scala source code to this file.
>>
>>
>>
>
<<image.jpg>>
