Hi,
I am doing a simple test with mapWithState, and get some events unexpected,
is this correct?

The test is very simple: sum the value of each key

val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
  state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
  (key, state.get())
}
val spec = StateSpec.function(mappingFunction)
dstream.mapWithState(spec)

I create two RDDs and insert into dstream:
RDD((1,1), (1,2), (2,1))
RDD((1,3))

Get result like this:
RDD(*(1,1)*, *(1,3)*, (2,1))
RDD((1,6))

You can see that the first batch will generate two items with the same key
"1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only.

Regards
- Terry

Reply via email to