Let me know if you do need a pull request for this, I can make that happen (given someone does a vast PR to make sure I'm understanding this problem right).
On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > Thanks for reporting it. I will take a look. > > On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> wrote: > >> Hi, >> I've been playing with the expiramental PairDStreamFunctions.mapWithState >> feature and I've seem to have stumbled across a bug, and was wondering if >> anyone else has been seeing this behavior. >> >> I've opened up an issue in the Spark JIRA, I simply want to pass this >> along >> in case anyone else is experiencing such a failure or perhaps someone has >> insightful information if this is actually a bug: SPARK-13195 >> <https://issues.apache.org/jira/browse/SPARK-13195> >> >> Using the new spark mapWithState API, I've encountered a bug when setting >> a >> timeout for mapWithState but no explicit state handling. >> >> h1. Steps to reproduce: >> >> 1. Create a method which conforms to the StateSpec signature, make sure to >> not update any state inside it using *state.update*. Simply create a "pass >> through" method, may even be empty. >> 2. Create a StateSpec object with method from step 1, which explicitly >> sets >> a timeout using *StateSpec.timeout* method. >> 3. Create a DStream pipeline that uses mapWithState with the given >> StateSpec. >> 4. Run code using spark-submit. You'll see that the method ends up >> throwing >> the following exception: >> >> {code} >> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 >> in >> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage >> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set >> at org.apache.spark.streaming.StateImpl.get(State.scala:150) >> at >> >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61) >> at >> >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at >> >> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >> at >> >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >> at >> >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at >> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> {code} >> >> h1. Sample code to reproduce the issue: >> >> {code:Title=MainObject} >> import org.apache.spark.streaming._ >> import org.apache.spark.{SparkConf, SparkContext} >> /** >> * Created by yuvali on 04/02/2016. >> */ >> object Program { >> >> def main(args: Array[String]): Unit = { >> >> val sc = new SparkConf().setAppName("mapWithState bug reproduce") >> val sparkContext = new SparkContext(sc) >> >> val ssc = new StreamingContext(sparkContext, Seconds(4)) >> val stateSpec = StateSpec.function(trackStateFunc >> _).timeout(Seconds(60)) >> >> // Create a stream that generates 1000 lines per second >> val stream = ssc.receiverStream(new DummySource(10)) >> >> // Split the lines into words, and create a paired (key-value) dstream >> val wordStream = stream.flatMap { >> _.split(" ") >> }.map(word => (word, 1)) >> >> // This represents the emitted stream from the trackStateFunc. Since >> we >> emit every input record with the updated value, >> // this stream will contain the same # of records as the input >> dstream. >> val wordCountStateStream = wordStream.mapWithState(stateSpec) >> wordCountStateStream.print() >> >> ssc.remember(Minutes(1)) // To make sure data is not deleted by the >> time >> we query it interactively >> >> // Don't forget to set checkpoint directory >> ssc.checkpoint("") >> ssc.start() >> ssc.awaitTermination() >> } >> >> def trackStateFunc(batchTime: Time, key: String, value: Option[Int], >> state: State[Long]): Option[(String, Long)] = { >> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) >> val output = (key, sum) >> Some(output) >> } >> } >> {code} >> >> {code:Title=DummySource} >> >> /** >> * Created by yuvali on 04/02/2016. >> */ >> >> import org.apache.spark.storage.StorageLevel >> import scala.util.Random >> import org.apache.spark.streaming.receiver._ >> >> class DummySource(ratePerSec: Int) extends >> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { >> >> def onStart() { >> // Start the thread that receives data over a connection >> new Thread("Dummy Source") { >> override def run() { receive() } >> }.start() >> } >> >> def onStop() { >> // There is nothing much to do as the thread calling receive() >> // is designed to stop by itself isStopped() returns false >> } >> >> /** Create a socket connection and receive data until receiver is >> stopped >> */ >> private def receive() { >> while(!isStopped()) { >> store("I am a dummy source " + Random.nextInt(10)) >> Thread.sleep((1000.toDouble / ratePerSec).toInt) >> } >> } >> } >> {code} >> >> The given issue resides in the following >> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the >> following code block: >> >> {code} >> dataIterator.foreach { case (key, value) => >> wrappedState.wrap(newStateMap.get(key)) >> val returned = mappingFunction(batchTime, key, Some(value), >> wrappedState) >> if (wrappedState.isRemoved) { >> newStateMap.remove(key) >> } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) >> /* <--- problem is here */ { >> newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) >> } >> mappedData ++= returned >> } >> {code} >> >> In case the stream has a timeout set, but the state wasn't set at all, the >> "else-if" will still follow through because the timeout is defined but >> "wrappedState" is empty and wasn't set. >> >> If it is mandatory to update state for each entry of *mapWithState*, then >> this code should throw a better exception than "NoSuchElementException", >> which doesn't really saw anything to the developer. >> >> I haven't provided a fix myself because I'm not familiar with the spark >> implementation, but it seems to be there needs to either be an extra check >> if the state is set, or as previously stated a better exception message. >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Best Regards, Yuval Itzchakov.