I'm a little confused as to why you have fake events rather than just doing foreachRDD or foreachPartition on your kafka stream and updating the accumulator there. I'd expect that to run each batch even if the batch had 0 kafka messages in it.
On Thu, Dec 10, 2015 at 2:05 PM, AliGouta <ali.go...@gmail.com> wrote: > I am actually running out of options. In my spark streaming application. I > want to keep a state on some keys. I am getting events from Kafka. Then I > extract keys from the event, say userID. When there is no events coming > from > Kafka I want to keep updating a counter relative to each user ID each 3 > seconds, since I configured the batchduration of my StreamingContext with 3 > seconds. > > Now the way I am doing it might be ugly, but at least it works: I have an > accumulableCollection like this: > > /al userID = ssc.sparkContext.accumulableCollection(new > mutable.HashMap[String,Long]())/ > > Then I create a "fake" event and keep pushing it to my spark streaming > context as the following: > > /val rddQueue = new mutable.SynchronizedQueue[RDD[String]]() > for ( i <- 1 to 100) { > rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE")) > Thread.sleep(3000) > } > val inputStream = ssc.queueStream(rddQueue) > > inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/ > > This would let me access to my accumulatorCollection and update all the > counters of all userIDs. Up to now everything works fine, however when I > change my loop from: > > /for ( i <- 1 to 100) {} #This is for test/ > > To: > > /while (true) {} #This is to let me access and update my accumulator > through > the whole application life cycle/ > > Then when I run my ./spark-submit, my application gets stuck on this stage: > > /15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager > slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, > slave1.cluster.example, 38959)/ > > Any clue on how to resolve this ? Is there a pretty straightforward way > that > would allow me updating the values of my userIDs (rather than creating an > unuseful RDD and pushing it periodically to the queuestream)? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >