Hi Vinti

I don’t program in scala, but I think you’ve changed the meaning of the current 
variable – look again at what it state and what is new data.

Assuming it works like the Java API, to use this function to maintain State you 
must call State.update, while you can return anything, not just the state.

Cheers
Iain

From: Vinti Maheshwari [mailto:vinti.u...@gmail.com]
Sent: 12 March 2016 22:10
To: user
Subject: [MARKETING] Spark Streaming stateful transformation mapWithState 
function getting error scala.MatchError: [Ljava.lang.Object]


Hi All,

I wanted to replace my updateStateByKey function with mapWithState function 
(Spark 1.6) to improve performance of my program.

I was following these two documents: 
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html

but i am getting error scala.MatchError: [Ljava.lang.Object]

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 
(TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: 
[Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)

at 
HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)

at 
HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)

at scala.Option.flatMap(Option.scala:170)

at 
HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)

Reference code:

def trackStateFunc(key:String, value:Option[Array[Long]], 
current:State[Array[Long]]) = {



        //either we can use this

        // current.update(value)



        value.map(_ :+ current).orElse(Some(current)).flatMap{

          case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ + 
_).toArray).toOption

          case None => ???

        }

      }



      val statespec:StateSpec[String, Array[Long], Array[Long], 
Option[Array[Long]]] = StateSpec.function(trackStateFunc _)



      val state: MapWithStateDStream[String, Array[Long], Array[Long], 
Option[Array[Long]]] = parsedStream.mapWithState(statespec)

My previous working code which was using updateStateByKey function:

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(

        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {

         prev.map(_ +: current).orElse(Some(current))

          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)

      })
Anyone has idea what can be the issue?
Thanks & Regards,
Vinti

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,
you may review at http://www.amdocs.com/email_disclaimer.asp

Reply via email to