I am actually running out of options. In my spark streaming application. I want to keep a state on some keys. I am getting events from Kafka. Then I extract keys from the event, say userID. When there is no events coming from Kafka I want to keep updating a counter relative to each user ID each 3 seconds, since I configured the batchduration of my StreamingContext with 3 seconds.
Now the way I am doing it might be ugly, but at least it works: I have an accumulableCollection like this: /al userID = ssc.sparkContext.accumulableCollection(new mutable.HashMap[String,Long]())/ Then I create a "fake" event and keep pushing it to my spark streaming context as the following: /val rddQueue = new mutable.SynchronizedQueue[RDD[String]]() for ( i <- 1 to 100) { rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE")) Thread.sleep(3000) } val inputStream = ssc.queueStream(rddQueue) inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/ This would let me access to my accumulatorCollection and update all the counters of all userIDs. Up to now everything works fine, however when I change my loop from: /for ( i <- 1 to 100) {} #This is for test/ To: /while (true) {} #This is to let me access and update my accumulator through the whole application life cycle/ Then when I run my ./spark-submit, my application gets stuck on this stage: /15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, slave1.cluster.example, 38959)/ Any clue on how to resolve this ? Is there a pretty straightforward way that would allow me updating the values of my userIDs (rather than creating an unuseful RDD and pushing it periodically to the queuestream)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org