MaxGekk commented on code in PR #41578:
URL: https://github.com/apache/spark/pull/41578#discussion_r1237166616
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala:
##########
@@ -684,6 +685,26 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}
+ test("SPARK-44044: non-time-window") {
+ val inputData = MemoryStream[(Int, Int)]
+ val e = intercept[AnalysisException] {
+ val agg = inputData
+ .toDF()
+ .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2")
+ .withWatermark("col1", "10 seconds")
+ .withColumn("rn_col", row_number().over(Window
+ .partitionBy("col1")
+ .orderBy(col("col2"))))
+ .select("rn_col", "col1", "col2")
+ .writeStream
+ .format("console")
+ .start()
+ }
+ assert(e.getMessage.contains(
Review Comment:
Could you use `checkError` to check the error class and parameters. In that
way, tech editors can change the error message and don't depend on internal
Spark tests.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -724,6 +724,16 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
messageParameters = Map("className" -> className, "operator" ->
operator))
}
+ def nonTimeWindowNotSupportedInStreamingError(
+ windowFuncs: String,
Review Comment:
Can the argument contain multiple names? If not, please, change it to
`windowFunc`
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -1636,6 +1636,11 @@
],
"sqlState" : "42000"
},
+ "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : {
+ "message" : [
+ "Window function is not supported in <windowFuncs> on streaming
DataFrames/Datasets. Structured Streaming only supports time-window aggregation
using the `window` function. (window specification: '<windowSpec>')"
Review Comment:
Please, remove '' around `windowSpec`
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -738,7 +738,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite
with SQLHelper {
testUnaryOperatorInStreamingPlan(
"sample", Sample(0.1, 1, true, 1L, _), expectedMsg = "sampling")
testUnaryOperatorInStreamingPlan(
- "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+ "window",
+ Window(Nil, Nil, Nil, _),
+ expectedMsg =
+ "Structured Streaming only supports time-window aggregation using the
`window` function")
Review Comment:
Is it possible to avoid the dependency from the error message format?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -724,6 +724,16 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
messageParameters = Map("className" -> className, "operator" ->
operator))
}
+ def nonTimeWindowNotSupportedInStreamingError(
+ windowFuncs: String,
+ windowSpec: String,
+ origin: Origin): AnalysisException = {
+ new AnalysisException(
+ errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
+ messageParameters = Map("windowFuncs" -> windowFuncs, "windowSpec" ->
windowSpec),
Review Comment:
Could you wrap `windowFuncs` by `toSQLId`, please.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]