Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself
it is not trivial to cleanly stream a RDD as a DStream. Since RDD
operations are defined to be scan based, it is not efficient to define RDD
based on slices of data within a partition of another RDD, using pure RDD
transformations. What you have done is a decent, and probably the only
feasible solution, with its limitations.

Also the requirements of converting a batch of data to a stream of data can
be pretty diverse. What rate, what # of events per batch, how many batches,
is it efficient? Hence, it is not trivial to define a good, clean public
API for that. If any one has any thoughts, ideas, etc on this, you are more
than welcome to share them.


On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> The use case for converting RDD into DStream is that I want to simulate a
> stream from an already persisted data for testing analytics. It is trivial
> to create a RDD from any persisted data but not so much for DStream.
> Therefore, my idea to create DStream from RDD. For example, lets say you
> are trying to implement analytics on time series data using Lambda
> architecture. This means you would have to implement the same analytics on
> streaming data (in streaming mode) as well as persisted data (in batch
> mode). The workflow for implementing the anlytics would be to first
> implement it in batch mode using RDD operations and then simulate stream to
> test the analytics in stream mode. The simulated stream should produce the
> elements at a specified rate. So the solution maybe to read data in a RDD,
> split (chunk) it into multiple RDDs with each RDD having the size of
> elements that need to be streamed per time unit and then finally stream
> each RDD using the compute function.
> The problem with using QueueInputDStream is that it will stream data as
> per the batch duration specified in the streaming context and one cannot
> specify a custom slide duration. Moreover, the class QueueInputDStream is
> private to streaming package, so I can't really use it/extend it from an
> external package. Also, I could not find a good solution split a RDD into
> equal sized smaller RDDs that can be fed into an extended version of
> QueueInputDStream.
> Finally, here is what I came up with:
> class RDDExtension[T: ClassTag](rdd: RDD[T]) {
>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
> slideDurationMilli: Option[Long] = None): DStream[T] = {
>     new InputDStream[T](streamingContext) {
>       private val iterator = rdd.toLocalIterator // WARNING: each
> partition much fit in RAM of local machine.
>       private val grouped = iterator.grouped(chunkSize)
>       override def start(): Unit = {}
>       override def stop(): Unit = {}
>       override def compute(validTime: Time): Option[RDD[T]] = {
>         if (grouped.hasNext) {
>           Some(rdd.sparkContext.parallelize(grouped.next()))
>         } else {
>           None
>         }
>       }
>       override def slideDuration = {
>         slideDurationMilli.map(duration => new Duration(duration)).
>           getOrElse(super.slideDuration)
>       }
>     }
> }
> This aims to stream chunkSize elements every slideDurationMilli
> milliseconds (defaults to batch size in streaming context). It's still not
> perfect (for example, the streaming is not precise) but given that this
> will only be used for testing purposes, I don't look for ways to further
> optimize it.
> Thanks,
> Aniket
> On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote:
>> Nice question :)
>> Ideally you should use a queuestream interface to push RDD into a queue &
>> then spark streaming can handle the rest.
>> Though why are you looking to convert RDD to DStream, another workaround
>> folks use is to source DStream from folders & move files that they need
>> reprocessed back into the folder, its a hack but much less headache .
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>> Hi everyone
>>> I haven't been receiving replies to my queries in the distribution list.
>>> Not pissed but I am actually curious to know if my messages are actually
>>> going through or not. Can someone please confirm that my msgs are getting
>>> delivered via this distribution list?
>>> Thanks,
>>> Aniket
>>> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com>
>>> wrote:
>>>> Sometimes it is useful to convert a RDD into a DStream for testing
>>>> purposes (generating DStreams from historical data, etc). Is there an easy
>>>> way to do this?
>>>> I could come up with the following inefficient way but no sure if there
>>>> is a better way to achieve this. Thoughts?
>>>> class RDDExtension[T](rdd: RDD[T]) {
>>>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>>>>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>>>   }
>>>>   def skipFirst(): RDD[T] = {
>>>>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>>>   }
>>>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>>>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>>>     new InputDStream[T](streamingContext) {
>>>>       @volatile private var currentRDD: RDD[Seq[T]] =
>>>> rdd.chunked(chunkSize)
>>>>       override def start(): Unit = {}
>>>>       override def stop(): Unit = {}
>>>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>>>         val chunk = currentRDD.take(1)
>>>>         currentRDD = currentRDD.skipFirst()
>>>>         Some(rdd.sparkContext.parallelize(chunk))
>>>>       }
>>>>       override def slideDuration = {
>>>>         slideDurationMilli.map(duration => new Duration(duration)).
>>>>           getOrElse(super.slideDuration)
>>>>       }
>>>>     }
>>>> }

Reply via email to