The updateStateByKey can be supplied an initialRDD to populate it with.
Per code (
https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445
).
Provided here for your convenience.
/**
* Return a new "state" DStream where the state for each key is
updated by applying
* the given function on the previous state of the key and the new
values of the key.
* org.apache.spark.Partitioner is used to control the partitioning
of each RDD.
* @param updateFunc State update function. If `this` function
returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning
of each RDD in the new
* DStream.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = ssc.withScope {
val cleanedUpdateF = sparkContext.clean(updateFunc)
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
}
updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
}
Simple example by Aniket Bhatnagar from an earlier thread on the forum.
def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] = {
val prevCount = prevStateOpt.getOrElse(0L)
val newCount = prevCount + events.size
Some(newCount)
}
val interval = 60 * 1000
val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_
* interval).map(n => (n % interval, n / interval))
val counts = eventsStream.map(event => {
(event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
HashPartitioner(3), initialRDD = initialRDD)
counts.print()
HTH.
-Todd
On Thu, Mar 10, 2016 at 1:35 AM, Zalzberg, Idan (Agoda) <
[email protected]> wrote:
> Hi,
>
>
>
> I have a spark-streaming application that basically keeps track of a
> string->string dictionary.
>
>
>
> So I have messages coming in with updates, like:
>
> “A”->”B”
>
> And I need to update the dictionary.
>
>
>
> This seems like a simple use case for the updateStateByKey method.
>
>
>
> However, my issue is that when the app starts I need to “initialize” the
> dictionary with data from a hive table, that has all the historical
> key/values with the dictionary.
>
>
>
> The only way I could think of is doing something like:
>
>
>
> val rdd =… //get data from hive
>
> *def *process(input: DStream[(String, String)]) = {
> input.join(rdd).updateStateByKey(*update*)
> }
>
> So the join operation will be done on every incoming buffer, where in fact
> I only need it on initialization.
>
>
>
> Any idea how to achieve that?
>
>
>
> Thanks
>
> ------------------------------
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>