Repository: spark
Updated Branches:
  refs/heads/master 4d114fc9a -> 77e52448e


[SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions

## What changes were proposed in this pull request?

Legitimate stops of streams may actually cause an exception to be captured by 
stream execution, because the job throws a SparkException regarding job 
cancellation during a stop. This PR makes the stop more graceful by swallowing 
this cancellation error.

## How was this patch tested?

This is pretty hard to test. The existing tests should make sure that we're not 
swallowing other specific SparkExceptions. I've also run the 
`KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, 
whereas it used to be flaky.

Closes #22478 from brkyvz/SPARK-25472.

Authored-by: Burak Yavuz <brk...@gmail.com>
Signed-off-by: Burak Yavuz <brk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77e52448
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77e52448
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77e52448

Branch: refs/heads/master
Commit: 77e52448e7f94aadfa852cc67084415de6ecfa7c
Parents: 4d114fc
Author: Burak Yavuz <brk...@gmail.com>
Authored: Thu Sep 20 15:46:33 2018 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Thu Sep 20 15:46:33 2018 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 22 +++++++++++++++-----
 .../continuous/ContinuousExecution.scala        |  4 ++--
 .../WriteToContinuousDataSourceExec.scala       |  2 +-
 3 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/77e52448/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f6c60c1..631a6eb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
 import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -282,7 +283,7 @@ abstract class StreamExecution(
         // `stop()` is already called. Let `finally` finish the cleanup.
       }
     } catch {
-      case e if isInterruptedByStop(e) =>
+      case e if isInterruptedByStop(e, sparkSession.sparkContext) =>
         // interrupted by stop()
         updateStatusMessage("Stopped")
       case e: IOException if e.getMessage != null
@@ -354,9 +355,9 @@ abstract class StreamExecution(
     }
   }
 
-  private def isInterruptedByStop(e: Throwable): Boolean = {
+  private def isInterruptedByStop(e: Throwable, sc: SparkContext): Boolean = {
     if (state.get == TERMINATED) {
-      StreamExecution.isInterruptionException(e)
+      StreamExecution.isInterruptionException(e, sc)
     } else {
       false
     }
@@ -531,7 +532,7 @@ object StreamExecution {
   val QUERY_ID_KEY = "sql.streaming.queryId"
   val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
 
-  def isInterruptionException(e: Throwable): Boolean = e match {
+  def isInterruptionException(e: Throwable, sc: SparkContext): Boolean = e 
match {
     // InterruptedIOException - thrown when an I/O operation is interrupted
     // ClosedByInterruptException - thrown when an I/O operation upon a 
channel is interrupted
     case _: InterruptedException | _: InterruptedIOException | _: 
ClosedByInterruptException =>
@@ -546,7 +547,18 @@ object StreamExecution {
     //                               ExecutionException, such as 
BiFunction.apply
     case e2 @ (_: UncheckedIOException | _: ExecutionException | _: 
UncheckedExecutionException)
         if e2.getCause != null =>
-      isInterruptionException(e2.getCause)
+      isInterruptionException(e2.getCause, sc)
+    case se: SparkException =>
+      val jobGroup = sc.getLocalProperty("spark.jobGroup.id")
+      if (jobGroup == null) return false
+      val errorMsg = se.getMessage
+      if (errorMsg.contains("cancelled") && errorMsg.contains(jobGroup) && 
se.getCause == null) {
+        true
+      } else if (se.getCause != null) {
+        isInterruptionException(se.getCause, sc)
+      } else {
+        false
+      }
     case _ =>
       false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/77e52448/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index ccca726..f009c52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -265,8 +265,8 @@ class ContinuousExecution(
           sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
       }
     } catch {
-      case t: Throwable
-          if StreamExecution.isInterruptionException(t) && state.get() == 
RECONFIGURING =>
+      case t: Throwable if StreamExecution.isInterruptionException(t, 
sparkSession.sparkContext) &&
+          state.get() == RECONFIGURING =>
         logInfo(s"Query $id ignoring exception from reconfiguring: $t")
         // interrupted by reconfiguration - swallow exception so we can 
restart the query
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/77e52448/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
index c216b61..a797ac1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -57,7 +57,7 @@ case class WriteToContinuousDataSourceExec(writeSupport: 
StreamingWriteSupport,
       case cause: Throwable =>
         cause match {
           // Do not wrap interruption exceptions that will be handled by 
streaming specially.
-          case _ if StreamExecution.isInterruptionException(cause) => throw 
cause
+          case _ if StreamExecution.isInterruptionException(cause, 
sparkContext) => throw cause
           // Only wrap non fatal exceptions.
           case NonFatal(e) => throw new SparkException("Writing job aborted.", 
e)
           case _ => throw cause


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to