I am actually running out of options. In my spark streaming application. I
want to keep a state on some keys. I am getting events from Kafka. Then I
extract keys from the event, say userID. When there is no events coming from
Kafka I want to keep updating a counter relative to each user ID each 3
seconds, since I configured the batchduration of my StreamingContext with 3
seconds.

Now the way I am doing it might be ugly, but at least it works: I have an
accumulableCollection like this:

/al userID = ssc.sparkContext.accumulableCollection(new
mutable.HashMap[String,Long]())/

Then I create a "fake" event and keep pushing it to my spark streaming
context as the following:

/val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
for ( i <- 1 to  100) {
  rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
  Thread.sleep(3000)
}
val inputStream = ssc.queueStream(rddQueue)

inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/

This would let me access to my accumulatorCollection and update all the
counters of all userIDs. Up to now everything works fine, however when I
change my loop from:

/for ( i <- 1 to  100) {} #This is for test/

To:

/while (true) {} #This is to let me access and update my accumulator through
the whole application life cycle/

Then when I run my ./spark-submit, my application gets stuck on this stage:

/15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager
slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1,
slave1.cluster.example, 38959)/

Any clue on how to resolve this ? Is there a pretty straightforward way that
would allow me updating the values of my userIDs (rather than creating an
unuseful RDD and pushing it periodically to the queuestream)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to