HyukjinKwon commented on code in PR #44953:
URL: https://github.com/apache/spark/pull/44953#discussion_r1475465792


##########
sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala:
##########
@@ -322,4 +329,71 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest
         "columnName" -> "`col`")
     )
   }
+
+  test("user-facing runtime errors") {
+    withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
+      var numTaskStarted = 0
+      val listener = new SparkListener {
+        override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+          numTaskStarted += 1
+        }
+      }
+      sparkContext.addSparkListener(listener)
+      try {
+        val df1 = spark.range(0, 10, 1, 1).map { v =>
+          if (v > 5) throw new RuntimeException("test error") else v
+        }
+        // If error is not user-facing, it will be wrapped by `SparkException` 
with "Job aborted".
+        val e1 = intercept[SparkException](df1.collect())
+        assert(e1.getMessage.contains("Job aborted"))
+        sparkContext.listenerBus.waitUntilEmpty()
+        // In this test suite, Spark re-tries the task 2 times.
+        assert(numTaskStarted == 2)
+        numTaskStarted = 0
+
+        val df2 = spark.range(0, 10, 1, 2).map { v =>
+          if (v > 5) throw new RuntimeException("test error") else v
+        }
+        val e2 = intercept[SparkException](df2.collect())
+        assert(e2.getMessage.contains("Job aborted"))
+        sparkContext.listenerBus.waitUntilEmpty()
+        // In this test suite, Spark re-tries the task 2 times, the input data 
has 2 partitions, but
+        // only the first task will fail (contains value 0), so in total 3 
tasks started.
+        assert(numTaskStarted == 3)
+        numTaskStarted = 0
+
+        val df3 = spark.range(0, 10, 1, 1).select(lit(1) / $"id")
+        checkError(
+          // If error is user-facing, it will be thrown directly.
+          exception = intercept[SparkArithmeticException](df3.collect()),
+          errorClass = "DIVIDE_BY_ZERO",
+          parameters = Map("config" -> ansiConf),
+          context = ExpectedContext(
+            fragment = "div",
+            callSitePattern = getCurrentClassCallSitePattern
+          )
+        )
+        sparkContext.listenerBus.waitUntilEmpty()
+        // TODO: Spark should not re-try this error.
+        assert(numTaskStarted == 2)
+        numTaskStarted = 0
+
+        val df4 = spark.range(0, 10, 1, 2).select(lit(1) / $"id")
+        checkError(
+          exception = intercept[SparkArithmeticException](df4.collect()),
+          errorClass = "DIVIDE_BY_ZERO",
+          parameters = Map("config" -> ansiConf),
+          context = ExpectedContext(
+            fragment = "div",
+            callSitePattern = getCurrentClassCallSitePattern
+          )
+        )
+        sparkContext.listenerBus.waitUntilEmpty()
+        // TODO: Spark should not re-try tasks this error.

Review Comment:
   nit but should ideally file a JIRA for this e.g., `TODO(SPARK-XXXXX): ...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to