This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f406b54b2a8 [SPARK-44044][SS] Improve Error message for Window functions with streaming f406b54b2a8 is described below commit f406b54b2a899d03bae2e6f70eef7fedfed63d65 Author: Siying Dong <siying.d...@databricks.com> AuthorDate: Sat Jul 1 08:51:22 2023 +0300 [SPARK-44044][SS] Improve Error message for Window functions with streaming ### What changes were proposed in this pull request? Replace existing error message when non-time window function is used with streaming to include aggregation function and column. The error message looks like following now: org.apache.spark.sql.AnalysisException: Window function is not supported in 'row_number()' as column 'rn_col' on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the `window` unction. (window specification: '(PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)') Note that the message is a little bit unnatural as the existing unit test requires the exception follows the pattern that it includes "not supported", "streaming" "DataFrames" and "Dataset". ### Why are the changes needed? The exiting error message is vague and a full logical plan is included. A user reports that they aren't able to identify what the problem is. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a unit test Closes #41578 from siying/window_error. Lead-authored-by: Siying Dong <siying.d...@databricks.com> Co-authored-by: Siying Dong <dong...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 5 ++++ .../analysis/UnsupportedOperationChecker.scala | 17 ++++++++++--- .../spark/sql/errors/QueryExecutionErrors.scala | 16 ++++++++++++- .../analysis/UnsupportedOperationsSuite.scala | 24 ++++++++++++++----- .../apache/spark/sql/streaming/StreamSuite.scala | 28 ++++++++++++++++++++++ 5 files changed, 80 insertions(+), 10 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index eabd5533e13..14bd3bc6bac 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1775,6 +1775,11 @@ ], "sqlState" : "42000" }, + "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : { + "message" : [ + "Window function is not supported in <windowFunc> (as column <columnName>) on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the WINDOW function. (window specification: <windowSpec>)" + ] + }, "NOT_ALLOWED_IN_FROM" : { "message" : [ "Not allowed in the FROM clause:" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index daa7c0d54b7..2a09d85d8f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode} @@ -508,8 +509,18 @@ object UnsupportedOperationChecker extends Logging { case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") - case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + case Window(windowExpression, _, _, child) if child.isStreaming => + val (windowFuncList, columnNameList, windowSpecList) = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => + (we.windowFunction.toString, e.toAttribute.sql, we.windowSpec.sql) + } + }.unzip3 + throw QueryExecutionErrors.nonTimeWindowNotSupportedInStreamingError( + windowFuncList, + columnNameList, + windowSpecList, + subPlan.origin) case ReturnAnswer(child) if child.isStreaming => throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 59b66bd4343..74c29cabbe1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval -import org.apache.spark.sql.catalyst.trees.{SQLQueryContext, TreeNode} +import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext, TreeNode} import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, DateTimeUtils, FailFastMode} import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, Table, TableProvider} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("className" -> className, "operator" -> operator)) } + def nonTimeWindowNotSupportedInStreamingError( + windowFuncList: Seq[String], + columnNameList: Seq[String], + windowSpecList: Seq[String], + origin: Origin): AnalysisException = { + new AnalysisException( + errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING", + messageParameters = Map( + "windowFunc" -> windowFuncList.map(toSQLStmt(_)).mkString(","), + "columnName" -> columnNameList.map(toSQLId(_)).mkString(","), + "windowSpec" -> windowSpecList.map(toSQLStmt(_)).mkString(",")), + origin = origin) + } + def multiplePathsSpecifiedError(allPaths: Seq[String]): SparkIllegalArgumentException = { new SparkIllegalArgumentException( errorClass = "_LEGACY_ERROR_TEMP_2050", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index f9fd02b86e9..32c9a3aa17e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -738,7 +738,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { testUnaryOperatorInStreamingPlan( "sample", Sample(0.1, 1, true, 1L, _), expectedMsg = "sampling") testUnaryOperatorInStreamingPlan( - "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows") + "window", + Window(Nil, Nil, Nil, _), + errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING") // Output modes with aggregation and non-aggregation plans testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true) @@ -870,7 +872,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { operationName: String, logicalPlanGenerator: LogicalPlan => LogicalPlan, outputMode: OutputMode = Append, - expectedMsg: String = ""): Unit = { + expectedMsg: String = "", + errorClass: String = ""): Unit = { val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg) @@ -878,7 +881,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { s"$operationName with stream relation", wrapInStreaming(logicalPlanGenerator(streamRelation)), outputMode, - expectedMsgs) + expectedMsgs, + errorClass) assertSupportedInStreamingPlan( s"$operationName with batch relation", @@ -1025,10 +1029,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { name: String, plan: LogicalPlan, outputMode: OutputMode, - expectedMsgs: Seq[String]): Unit = { + expectedMsgs: Seq[String], + errorClass: String = ""): Unit = { testError( s"streaming plan - $name: not supported", - expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported") { + expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported", + errorClass) { UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode) } } @@ -1090,7 +1096,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { * Test whether the body of code will fail. If it does fail, then check if it has expected * messages. */ - def testError(testName: String, expectedMsgs: Seq[String])(testBody: => Unit): Unit = { + def testError( + testName: String, + expectedMsgs: Seq[String], + errorClass: String = "")(testBody: => Unit): Unit = { test(testName) { val e = intercept[AnalysisException] { @@ -1102,6 +1111,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { s"actual exception message:\n\t'${e.getMessage}'") } } + if (!errorClass.isEmpty) { + assert(e.getErrorClass == errorClass) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6fd63454e82..0ee44a098f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider @@ -686,6 +687,33 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } + test("SPARK-44044: non-time-window") { + val inputData = MemoryStream[(Int, Int)] + val e = intercept[AnalysisException] { + val agg = inputData + .toDF() + .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2") + .withWatermark("col1", "10 seconds") + .withColumn("rn_col", row_number().over(Window + .partitionBy("col1") + .orderBy(col("col2")))) + .select("rn_col", "col1", "col2") + .writeStream + .format("console") + .start() + } + checkError( + e, + "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING", + parameters = Map( + "windowFunc" -> "ROW_NUMBER()", + "columnName" -> "`rn_col`", + "windowSpec" -> + ("(PARTITION BY COL1 ORDER BY COL2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING " + + "AND CURRENT ROW)"))) + } + + test("SPARK-19873: streaming aggregation with change in number of partitions") { val inputData = MemoryStream[(Int, Int)] val agg = inputData.toDS().groupBy("_1").count() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org