Hi Vinti I don’t program in scala, but I think you’ve changed the meaning of the current variable – look again at what it state and what is new data.
Assuming it works like the Java API, to use this function to maintain State you must call State.update, while you can return anything, not just the state. Cheers Iain From: Vinti Maheshwari [mailto:vinti.u...@gmail.com] Sent: 12 March 2016 22:10 To: user Subject: [MARKETING] Spark Streaming stateful transformation mapWithState function getting error scala.MatchError: [Ljava.lang.Object] Hi All, I wanted to replace my updateStateByKey function with mapWithState function (Spark 1.6) to improve performance of my program. I was following these two documents: https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html but i am getting error scala.MatchError: [Ljava.lang.Object] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;) at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84) at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84) at scala.Option.flatMap(Option.scala:170) at HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84) Reference code: def trackStateFunc(key:String, value:Option[Array[Long]], current:State[Array[Long]]) = { //either we can use this // current.update(value) value.map(_ :+ current).orElse(Some(current)).flatMap{ case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ + _).toArray).toOption case None => ??? } } val statespec:StateSpec[String, Array[Long], Array[Long], Option[Array[Long]]] = StateSpec.function(trackStateFunc _) val state: MapWithStateDStream[String, Array[Long], Array[Long], Option[Array[Long]]] = parsedStream.mapWithState(statespec) My previous working code which was using updateStateByKey function: val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { prev.map(_ +: current).orElse(Some(current)) .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) }) Anyone has idea what can be the issue? Thanks & Regards, Vinti This message and the information contained herein is proprietary and confidential and subject to the Amdocs policy statement, you may review at http://www.amdocs.com/email_disclaimer.asp