I am creating a workflow; I have an existing call to updateStateByKey that works fine, but when I created a second use where the key is a Tuple2, it's now failing with the dreaded "overloaded method value updateStateByKey with alternatives ... cannot be applied to ..." Comparing the two uses I'm not seeing anything that seems broken, though I do note that all the messages below describe what the code provides as Time as org.apache.spark.streaming.Time.
a) Could the Time v org.apache.spark.streaming.Time difference be causing this? (I'm using Time the same in the first use, which appears to work properly.) b) Any suggestions of what else could be causing the error? ------code-------- val ssc = new StreamingContext(conf, Seconds(timeSliceArg)) ssc.checkpoint(".") var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var DnsSvr = linesArray.map( lineArray => ( (lineArray(4), lineArray(5)), (1 , Time((lineArray(0).toDouble*1000).toLong) )) ) val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, Time)]) => { val currentCount = if (values.isEmpty) 0 else values.map( x => x._1).sum val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else values.map( x => x._2).min val (previousCount, minTime) = state.getOrElse((0, Time(System.currentTimeMillis))) (currentCount + previousCount, Seq(minTime, newMinTime).min) } var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount) // <=== error here ------compilation output---------- [error] /Users/spr/Documents/.../cyberSCinet.scala:513: overloaded method value updateStateByKey with alternatives: [error] (updateFunc: Iterator[((String, String), Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)])] => Iterator[((String, String), (Int, org.apache.spark.streaming.Time))],partitioner: org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] <and> [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)],partitioner: org.apache.spark.Partitioner)(implicit evidence$4: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] <and> [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)],numPartitions: Int)(implicit evidence$3: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] <and> [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)])(implicit evidence$2: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] cannot be applied to ((Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => (Int, org.apache.spark.streaming.Time)) [error] var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/in-function-prototypes-tp18642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org