Hi all,
I'm a complete newbie to spark and spark streaming, so the question may seem
obvious, sorry for that.
It is okay to store Seq[Data] in state when using 'updateStateByKey'? I have
a function with signature
def saveState(values: Seq[Msg], value: Option[Iterable[Msg]]):
Option[Iterable[Msg]] that stores about 200k records in iterable. I've seen
most examples having some kind of accumulators in state, so I'm wondering if
having a collection is a normal usecase.
Maybe you can suggest how to solve my task without this mutable state. I
have a kafka topic that generates about 20k messages/sec. I need to group
messages based on some key and send groups to another topic. Groups should
be sent when number of messages exceeds some count N OR when predefined time
T has passed since the first message in a group has arrived, no matter if
group messages count is less then N. First of all window functions come into
mind, but the problem is that I need to send group as soon N messages
arrived, not wait until window duration has passed.
I decided to set batch size to 0.5 sec and T is about 3 sec. on each batch I
first take groups that have enough messages and send them. The rest of the
messages I put to shared state. In updateStateByKey I have all messages that
have not been set yet - I again try to group them and send those groups that
have enough messages. This way I check messages with latency 0.5 sec instead
of 3s.
Update function:
def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg],
Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = {
// when does values size is greater than 1? I didn't get into that
yet.
if (values.size 1){
throw new NullPointerException
}
// notSent - those that have not been sent yet
val (notSent, _) = value.getOrElse((List(), List()))
// discard sent
// here goes more complex logic with verification if message should be
sent based on its arrival time
// for now it is simplified
val all = notSent ++ values(0)
val result = all.groupBy(_.key)
.partition(ifNotSend _)
Some((result._1.values.flatten, result._2.values.flatten))
}
The whole code:
val batchSize = 5
// will persist speed-up anything here?
val grouped = inputStream.map(msg = (msg.key,
msg)).groupByKey().persist()
def ifSend(x: (Int, Iterable[_])) = x._2.size = batchSize
def ifNotSend(x: (Int, Iterable[_])) = !ifSend(x)
val readyToSend = grouped.filter(ifSend _)
readyToSend.foreachRDD(rdd = {
// send to kafka
})
// this should not be sent immediately but combined with those
val incomplete = grouped.filter(ifNotSend _)
/**
* returns (Seq[Msg], Seq[Msg])
* _1 - messages that should not be sent and preserved for next batch
execution
* _2 - messages that
*
*/
def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg],
Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = {
if (values.size 1){
throw new NullPointerException
}
// notSent - those that have not been sent yet
val (notSent, _) = value.getOrElse((List(), List()))
// discard sent
// here goes more complex logic with verification if message should be
sent based on its arrival time
// for now it is simplified
val all = notSent ++ values(0)
val result = all.groupBy(_.key)
.partition(ifNotSend _)
Some((result._1.values.flatten, result._2.values.flatten))
}
val state = incomplete.updateStateByKey(saveState _)
state.foreachRDD(rdd = {
val messagesToSend = rdd.filter(x = x._2._2.nonEmpty)
.map(x = x._2._2)
println(messagesToSend.collect().flatten.mkString(,))
println()
})
Maybe you could suggest a better/more efficient solution?
Thanks in advance.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Storing-a-lot-of-state-with-updateStateByKey-tp22890.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org