Re: Streams: How do RDDs get Aggregated?
Hi Spark ! I found out why my RDD's werent coming through in my spark stream. It turns out you need the onStart() needs to return , it seems - i.e. you need to launch the worker part of your start process in a thread. For example def onStartMock():Unit ={ val future = new Thread(new Runnable() { def run() { for(x - 1 until 10) { val newMem = Runtime.getRuntime.freeMemory()/12188091; if(newMem != lastMem){ System.out.println(in thread : + newMem); } lastMem=newMem; store(mockStatus); } }}); Hope that helps somebody in the same situation. FYI Its in the docs :) * {{{ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { * def onStart() { * // Setup stuff (start threads, open sockets, etc.) to start receiving data. * // Must start new thread to receive data, as onStart() must be non-blocking. * * // Call store(...) in those threads to store received data into Spark's memory. * * // Call stop(...), restart(...) or reportError(...) on any thread based on how * // different errors needs to be handled. * * // See corresponding method documentation for more details * } * * def onStop() { * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. * } * } * }}}
Re: Streams: How do RDDs get Aggregated?
Oh - and one other note on this, which appears to be the case. If , in your stream forEachRDD implementation, you do something stupid (like call rdd.count()) tweetStream.foreachRDD((rdd,lent)= { tweetStream.repartition(1) numTweetsCollected+=1; //val count = rdd.count() DONT DO THIS ! You can also get stuck in a situation where your RDD processor blocks infinitely. And for twitter specific stuff, make sure to look at modifying the TwitterInputDStream class so that it implements the stuff from SPARK-2464, which can lead to infinite stream reopening as well. On Tue, Oct 21, 2014 at 11:02 AM, jay vyas jayunit100.apa...@gmail.com wrote: Hi Spark ! I found out why my RDD's werent coming through in my spark stream. It turns out you need the onStart() needs to return , it seems - i.e. you need to launch the worker part of your start process in a thread. For example def onStartMock():Unit ={ val future = new Thread(new Runnable() { def run() { for(x - 1 until 10) { val newMem = Runtime.getRuntime.freeMemory()/12188091; if(newMem != lastMem){ System.out.println(in thread : + newMem); } lastMem=newMem; store(mockStatus); } }}); Hope that helps somebody in the same situation. FYI Its in the docs :) * {{{ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { * def onStart() { * // Setup stuff (start threads, open sockets, etc.) to start receiving data. * // Must start new thread to receive data, as onStart() must be non-blocking. * * // Call store(...) in those threads to store received data into Spark's memory. * * // Call stop(...), restart(...) or reportError(...) on any thread based on how * // different errors needs to be handled. * * // See corresponding method documentation for more details * } * * def onStop() { * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. * } * } * }}} -- jay vyas
Streams: How do RDDs get Aggregated?
Hi spark ! I dont quite yet understand the semantics of RDDs in a streaming context very well yet. Are there any examples of how to implement CustomInputDStreams, with corresponding Receivers in the docs ? Ive hacked together a custom stream, which is being opened and is consuming data internally, however, it is not empty RDDs, even though I am calling store(...) mutliple times - however, Im relying on the default implementation of store(...) which may be a mistake on my end. By making my slide duration small, I can make sure that indeed the job finishes - however im not quite sure how we're supposed to shuttle data from the ReceiverInputDStream into RDDs ? Thanks!