Hi everyone I haven't been receiving replies to my queries in the distribution list. Not pissed but I am actually curious to know if my messages are actually going through or not. Can someone please confirm that my msgs are getting delivered via this distribution list?
Thanks, Aniket On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com> wrote: > Sometimes it is useful to convert a RDD into a DStream for testing > purposes (generating DStreams from historical data, etc). Is there an easy > way to do this? > > I could come up with the following inefficient way but no sure if there is > a better way to achieve this. Thoughts? > > class RDDExtension[T](rdd: RDD[T]) { > > def chunked(chunkSize: Int): RDD[Seq[T]] = { > rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize)) > } > > def skipFirst(): RDD[T] = { > rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1) > } > > def toStream(streamingContext: StreamingContext, chunkSize: Int, > slideDurationMilli: Option[Long] = None): DStream[T] = { > new InputDStream[T](streamingContext) { > > @volatile private var currentRDD: RDD[Seq[T]] = > rdd.chunked(chunkSize) > > override def start(): Unit = {} > > override def stop(): Unit = {} > > override def compute(validTime: Time): Option[RDD[T]] = { > val chunk = currentRDD.take(1) > currentRDD = currentRDD.skipFirst() > Some(rdd.sparkContext.parallelize(chunk)) > } > > override def slideDuration = { > slideDurationMilli.map(duration => new Duration(duration)). > getOrElse(super.slideDuration) > } > } > > } >