I am trying to understand what will happen when Spark has an exception
during processing, especially while streaming.

If I have a small code spinet like this:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  throw new Exception("User exception...")
}

If I run this I will get output like this:

[info] processed => [List(Item1)]
[error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job
streaming job 1453999278000 ms.0
[error] java.lang.Exception: User exception...
...
[info] processed => [List(Item2)]
[error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job
streaming job 1453999279000 ms.0
[error] java.lang.Exception: User exception...

First "Item1" is processed, and it fails (of course). In the next batch
"Item2" is processed. The record "Item1" has now been lost.

If I change my code so that the exception occurs inside a task:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  rdd.map{case x => throw new Exception("User exception...") }.collect()
}

Then the map closure will be retried, but once it has failed enough times
the record is discarded and processing continues to the next record.

Is it possible to ensure that records are not discarded, even if this means
stopping the application? I have the WAL enabled.

Reply via email to