This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c35ca7cab62 [SPARK-45539][SS] Add assert and log to indicate watermark 
definition is required for streaming aggregation queries in append mode
c35ca7cab62 is described below

commit c35ca7cab6267c3ea0b74631afac6203059207ae
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Mon Oct 16 14:34:12 2023 +0900

    [SPARK-45539][SS] Add assert and log to indicate watermark definition is 
required for streaming aggregation queries in append mode
    
    ### What changes were proposed in this pull request?
    Add assert and log to indicate watermark definition is required for 
streaming aggregation queries in append mode
    
    ### Why are the changes needed?
    We have a check for ensuring that watermark attributes are specified in 
append mode based on the UnsupportedOperationChecker. However, in some cases we 
got report where user hit this stack trace:
    
    ```
    org.apache.spark.SparkException: Exception thrown in awaitResult: Job 
aborted due to stage failure: Task 3 in stage 32.0 failed 4 times, most recent 
failure: Lost task 3.3 in stage 32.0 (TID 606) (10.5.71.29 executor 0): 
java.util.NoSuchElementException: None.get
            at scala.None$.get(Option.scala:529)
            at scala.None$.get(Option.scala:527)
            at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:472)
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
            at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:708)
            at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:145)
            at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:145)
            at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:414)
            at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:470)
            at 
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63)
            at 
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    ```
    
    In this case, the reason for failure is not immediately clear. Hence adding 
an assert and log message to indicate why the query failed on the executor.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43370 from anishshri-db/task/SPARK-45539.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/execution/streaming/statefulOperators.scala | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 77645378f22..cb01fa9ff6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -520,6 +520,12 @@ case class StateStoreSaveExec(
           // Update and output only rows being evicted from the StateStore
           // Assumption: watermark predicates must be non-empty if append mode 
is allowed
           case Some(Append) =>
+            assert(watermarkPredicateForDataForLateEvents.isDefined,
+              "Watermark needs to be defined for streaming aggregation query 
in append mode")
+
+            assert(watermarkPredicateForKeysForEviction.isDefined,
+              "Watermark needs to be defined for streaming aggregation query 
in append mode")
+
             allUpdatesTimeMs += timeTakenMs {
               val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
                 watermarkPredicateForDataForLateEvents.get)
@@ -777,6 +783,9 @@ case class SessionWindowStateStoreSaveExec(
         // Update and output only rows being evicted from the StateStore
         // Assumption: watermark predicates must be non-empty if append mode 
is allowed
         case Some(Append) =>
+          assert(watermarkPredicateForDataForEviction.isDefined,
+              "Watermark needs to be defined for session window query in 
append mode")
+
           allUpdatesTimeMs += timeTakenMs {
             putToStore(iter, store)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to