This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c35ca7cab62 [SPARK-45539][SS] Add assert and log to indicate watermark definition is required for streaming aggregation queries in append mode c35ca7cab62 is described below commit c35ca7cab6267c3ea0b74631afac6203059207ae Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Mon Oct 16 14:34:12 2023 +0900 [SPARK-45539][SS] Add assert and log to indicate watermark definition is required for streaming aggregation queries in append mode ### What changes were proposed in this pull request? Add assert and log to indicate watermark definition is required for streaming aggregation queries in append mode ### Why are the changes needed? We have a check for ensuring that watermark attributes are specified in append mode based on the UnsupportedOperationChecker. However, in some cases we got report where user hit this stack trace: ``` org.apache.spark.SparkException: Exception thrown in awaitResult: Job aborted due to stage failure: Task 3 in stage 32.0 failed 4 times, most recent failure: Lost task 3.3 in stage 32.0 (TID 606) (10.5.71.29 executor 0): java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:472) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:708) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:145) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:145) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:414) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:470) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) ``` In this case, the reason for failure is not immediately clear. Hence adding an assert and log message to indicate why the query failed on the executor. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43370 from anishshri-db/task/SPARK-45539. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/execution/streaming/statefulOperators.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 77645378f22..cb01fa9ff6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -520,6 +520,12 @@ case class StateStoreSaveExec( // Update and output only rows being evicted from the StateStore // Assumption: watermark predicates must be non-empty if append mode is allowed case Some(Append) => + assert(watermarkPredicateForDataForLateEvents.isDefined, + "Watermark needs to be defined for streaming aggregation query in append mode") + + assert(watermarkPredicateForKeysForEviction.isDefined, + "Watermark needs to be defined for streaming aggregation query in append mode") + allUpdatesTimeMs += timeTakenMs { val filteredIter = applyRemovingRowsOlderThanWatermark(iter, watermarkPredicateForDataForLateEvents.get) @@ -777,6 +783,9 @@ case class SessionWindowStateStoreSaveExec( // Update and output only rows being evicted from the StateStore // Assumption: watermark predicates must be non-empty if append mode is allowed case Some(Append) => + assert(watermarkPredicateForDataForEviction.isDefined, + "Watermark needs to be defined for session window query in append mode") + allUpdatesTimeMs += timeTakenMs { putToStore(iter, store) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org