I'm trying to understand updateStateByKey. Here's an example I'm testing with: Input data: DStream( RDD( ("a",2) ), RDD( ("a",3) ), RDD( ("a",4) ), RDD( ("a",5) ), RDD( ("a",6) ), RDD( ("a",7) ) )
Code: val updateFunc = (values: Seq[Int], state: Option[StateClass]) => { val previousState = state.getOrElse( StateClass(0,0, Seq()) ) val currentSum = values.sum + previousState.sum val currentCount = values.size + previousState.count if (currentCount==previousState.count) { None //if this RDD has no change then remove the tuple } else { Some( StateClass(currentSum, currentCount, values) ) } } intStream.updateStateByKey[StateClass](updateFunc).transform(rdd=>rdd.map(t=>(t,rdd.id))).print() Results: ((a,StateClass(14,5,ArrayBuffer(2.0, 3.0, 3.0, 3.0, 3.0))),12) ((a,StateClass(17,6,ArrayBuffer(3.0))),22) ((a,StateClass(20,7,ArrayBuffer(3.0))),32) Questions: Why does RDD with ID=12 have these elements: (2.0, 3.0, 3.0, 3.0, 3.0) ? These do not exist in input data so where do these numbers come from? ..well 2 and 3 exists but not the other 3's and it's missing 4,5,6,7 also. What is going on here? -Adrian