I started with StatefulNetworkWordCount to have a running count of words seen.
I have a file 'stored.count' which contains the word counts. $ cat stored.count a 1 b 2 I want to initialize StatefulNetworkWordCount with the values in 'stored.count' file, how do I do that? I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be useful to have an initial RDD feeding into 'counts' at 't = 1', as below. initial | t = 1: pageView -> ones -> counts | t = 2: pageView -> ones -> counts ... I have also attached the modified figure 2 of http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf . I managed to hack Spark code to achieve this, and attaching the modified files. Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey method, as def updateStateByKey[S: ClassTag]( initial : RDD [(K, S)], updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] If it sounds interesting for larger crowd I would be happy to cleanup the code, and volunteer to push into the code. I don't know the procedure to that though. -Soumitra.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.{Partitioner, HashPartitioner} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.{Time, Duration} /** * Extra functions available on DStream of (key, value) pairs through an implicit conversion. * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use * these functions. */ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]) extends Serializable { private[streaming] def ssc = self.ssc private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions) } /** * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ def groupByKey(): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner()) } /** * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying `groupByKey` on each RDD. The supplied * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) .asInstanceOf[DStream[(K, Iterable[V])]] } /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner()) } /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs * with `numPartitions` partitions. */ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } /** * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. */ def combineByKey[C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) } /** * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with * Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) } /** * Return a new DStream by applying `groupByKey` over a sliding window. Similar to * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) : DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } /** * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param numPartitions number of partitions of each RDD in the new DStream; if not specified * then Spark's default number of partitions will be used */ def groupByKeyAndWindow( windowDuration: Duration, slideDuration: Duration, numPartitions: Int ): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } /** * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param partitioner partitioner for controlling the partitioning of each RDD in the new * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner ): DStream[(K, Iterable[V])] = { val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 self.groupByKey(partitioner) .window(windowDuration, slideDuration) .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) .asInstanceOf[DStream[(K, Iterable[V])]] } /** * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } /** * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } /** * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param numPartitions number of partitions of each RDD in the new DStream. */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param partitioner partitioner for controlling the partitioning of each RDD * in the new DStream. */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) .window(windowDuration, slideDuration) .reduceByKey(cleanedReduceFunc, partitioner) } /** * Return a new DStream by applying incremental `reduceByKey` over a sliding window. * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) * * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param filterFunc Optional function to filter expired key-value pairs; * only pairs that satisfy the function are retained */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration = self.slideDuration, numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K, V)) => Boolean = null ): DStream[(K, V)] = { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions), filterFunc ) } /** * Return a new DStream by applying incremental `reduceByKey` over a sliding window. * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * @param reduceFunc associative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param partitioner partitioner for controlling the partitioning of each RDD in the new * DStream. * @param filterFunc Optional function to filter expired key-value pairs; * only pairs that satisfy the function are retained */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner, filterFunc: ((K, V)) => Boolean ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None new ReducedWindowedDStream[K, V]( self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, windowDuration, slideDuration, partitioner ) } /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) } def updateStateByKey[S: ClassTag]( initial : RDD [(K, S)], updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(initial, updateFunc, defaultPartitioner()) } /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } updateStateByKey(newUpdateFunc, partitioner, true) } def updateStateByKey[S: ClassTag]( initial : RDD [(K, S)], updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } updateStateByKey(initial, newUpdateFunc, partitioner, true) } /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key * than the input key. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) } def updateStateByKey[S: ClassTag]( initial : RDD [(K, S)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = { val tmp = new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) tmp.setInitial (initial) tmp } /** * Return a new DStream by applying a map function to the value of each key-value pairs in * 'this' DStream without changing the key. */ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { new MapValuedDStream[K, V, U](self, mapValuesFunc) } /** * Return a new DStream by applying a flatmap function to the value of each key-value pairs in * 'this' DStream without changing the key. */ def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner()) } /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) : DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. */ def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Iterable[V], Iterable[W]))] = { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) ) } /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, W))] = { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) ) } /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { leftOuterJoin[W](other, defaultPartitioner()) } /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` * partitions. */ def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int ): DStream[(K, (V, Option[W]))] = { leftOuterJoin[W](other, defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, Option[W]))] = { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) ) } /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { rightOuterJoin[W](other, defaultPartitioner()) } /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` * partitions. */ def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int ): DStream[(K, (Option[V], W))] = { rightOuterJoin[W](other, defaultPartitioner(numPartitions)) } /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Option[V], W))] = { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) ) } /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String )(implicit fm: ClassTag[F]) { saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } self.foreachRDD(saveFunc) } /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String )(implicit fm: ClassTag[F]) { saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = new Configuration ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } self.foreachRDD(saveFunc) } private def keyClass: Class[_] = kt.runtimeClass private def valueClass: Class[_] = vt.runtimeClass }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.Partitioner import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, preservePartitioning: Boolean ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override val mustCheckpoint = true private var initial : Option [RDD [(K, S)]] = None def setInitial (i : RDD [(K, S)]) = { initial = Some (i) this } override def compute(validTime: Time): Option[RDD[(K, S)]] = { // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { case Some(prevStateRDD) => { // If previous state RDD exists // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => { // If parent RDD exists, then compute as usual // Define the function for the mapPartition operation on cogrouped RDD; // first map the cogrouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { val i = iterator.map(t => { val itr = t._2._2.iterator val headOption = itr.hasNext match { case true => Some(itr.next()) case false => None } (t._1, t._2._1.toSeq, headOption) }) updateFuncLocal(i) } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) } case None => { // If parent RDD does not exist // Re-apply the update function to the old state RDD val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, S)]) => { val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) } } } case None => { // If previous session RDD does not exist (first input data) // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => { // If parent RDD exists, then compute as usual initial match { case None => // Define the function for the mapPartition operation on grouped RDD; // first map the grouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => { updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None))) } val groupedRDD = parentRDD.groupByKey (partitioner) val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") Some (sessionRDD) case Some (initialRDD) => val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { val i = iterator.map(t => { val itr = t._2._2.iterator val headOption = itr.hasNext match { case true => Some(itr.next()) case false => None } (t._1, t._2._1.toSeq, headOption) }) updateFuncLocal(i) } val cogroupedRDD = parentRDD.cogroup(initialRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) } } case None => { // If parent RDD does not exist, then nothing to do! // logDebug("Not generating state RDD (no previous state, no parent)") None } } } } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org