Yes, I found that after sending my response; the final program is:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1]
[filter2] ... [filterN]")
      System.exit(1)
    }

    System.setProperty("spark.cleaner.ttl", "600")

    val (master, filters) = (args.head, args.tail)

    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER)
    val words = stream.flatMap(status => status.getText.toLowerCase.split("
")).countByValue()
    val countWords = words.reduceByKey(_ + _)

    countWords.foreachRDD(rdd =>
      rdd.foreachPartition { iterator =>
        val client = new Jedis("localhost")
        val pipeline = client.pipelined()
        iterator.foreach {
          case (word, count) =>
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
        client.quit()
      }
    )

    ssc.start()
  }
}



2014-02-02 Ewen Cheslack-Postava <[email protected]>:

> Ah, sorry, I read too quickly and missed that this was for Spark
> Streaming. In that case you will get RDDs from foreach, so I guess you want
> to use RDD.foreachPartition inside the call to DStream.foreach.
>
> -Ewen
>
>
> Luis Ángel Vicente Sánchez wrote:
>
> Thank Ewen! Now I understand why I was getting the error message. It seems
> that foreachPartition doesn't exists as part of the DStream class :-\ I
> will check API docs to find other alternatives.
>
>
>
>
> 2014-02-02 Ewen Cheslack-Postava <[email protected]>:
>
>> If you use anything created on the driver program within functions run on
>> workers, it needs to be serializable, but your pool of Redis connections is
>> not. Normally, the simple way to fix this is to use the *With methods of
>> RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to
>> instantiate a connection to Redis on a per-partition basis.
>>
>> But your logic for outputting the data doesn't look right.
>> cntWords.foreach's function parameter would be getting each (word,count)
>> element, *not the whole RDD*. You probably want to share a single Redis
>> pipeline for each RDD partition, which you can accomplish with
>> foreachPartition. It gives you an iterator over all elements in each
>> partition. It would look something like this:
>>
>> cntWords.foreachPartition(it =>
>>
>>   val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
>> 6379, 2000))
>>   pool.withJedisClient { client =>
>>     val pipeline = client.pipeline()
>>     it.foreach { case (word,count) => pipeline.incrBy(word,count) }
>>     pipeline.sync()
>>   }
>> )
>>
>> Of course you wouldn't actually need the pool in that case, but I'm not
>> familiar with the Jedis library so I'm not sure how you'd create just a
>> single connection.
>>
>> -Ewen
>>
>>  Luis Ángel Vicente Sánchez <[email protected]>
>>  February 2, 2014 at 7:18 AM
>> I'm trying to create a simple twitter word counter with spark-streaming
>> and I would like to store the word counts in redis. The program looks like
>> this:
>>
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.{ Seconds, StreamingContext }
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.streaming.twitter._
>> import org.sedis._
>> import redis.clients.jedis._
>>
>> object TwitterWordCount {
>>   def main(args: Array[String]) {
>>     if (args.length < 1) {
>>       System.err.println("Usage: TwitterWordCount <master> [filter1]
>> [filter2] ... [filterN]")
>>       System.exit(1)
>>     }
>>
>>     val (master, filters) = (args.head, args.tail)
>>
>>     val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
>> 6379, 2000))
>>     val ssc = new StreamingContext(master, "TwitterWordCount",
>> Seconds(5), System.getenv("SPARK_HOME"),
>> StreamingContext.jarOfClass(this.getClass))
>>     val stream = TwitterUtils.createStream(ssc, None, filters,
>> StorageLevel.MEMORY_ONLY_SER)
>>
>>     val words = stream.flatMap(status =>
>> status.getText.toLowerCase.split(" ")).map(word => (word, 1l))
>>
>>     val cntWords = words.reduceByKey(_ + _)
>>
>>     cntWords.foreach(rdd =>
>>       pool.withJedisClient { client =>
>>         val pipeline = client.pipelined()
>>         rdd.foreach {
>>           case (word, count) =>
>>             pipeline.incrBy(word, count)
>>         }
>>         pipeline.sync()
>>       }
>>     )
>>
>>     ssc.start()
>>   }
>> }
>>
>> Everytime I run this program, I get this error:
>>
>> [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
>> streaming job 1391354180000 ms.0
>> [error] org.apache.spark.SparkException: Job aborted: Task not
>> serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> [error] at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> [error] at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> [error] at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> [error] at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>> [error] at scala.Option.foreach(Option.scala:236)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
>> [error] at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> [error] at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> [error] at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I have tried to not use redis pipelines and then I get the same error but
>> related to the Jedis client.
>>
>> Have anybody done something similar?
>>
>> Kind regards,
>>
>> Luis
>>
>> P.S. I have attached my build.sbt and the scala source code to this file.
>>
>>
>>
>

<<image.jpg>>

Reply via email to