HeartSaVioR commented on code in PR #47656:
URL: https://github.com/apache/spark/pull/47656#discussion_r1711463349
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -903,8 +903,10 @@ case class SessionWindowStateStoreSaveExec(
// Update and output only rows being evicted from the StateStore
// Assumption: watermark predicates must be non-empty if append mode
is allowed
case Some(Append) =>
- assert(watermarkPredicateForDataForEviction.isDefined,
- "Watermark needs to be defined for session window query in
append mode")
+ if (watermarkPredicateForDataForEviction.isEmpty) {
+ throw
QueryExecutionErrors.unsupportedStreamingQueryWithoutWatermark(
+ "Append", "session window")
Review Comment:
`session window aggregation`, likewise above
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -602,11 +602,11 @@ case class StateStoreSaveExec(
// Update and output only rows being evicted from the StateStore
// Assumption: watermark predicates must be non-empty if append mode
is allowed
case Some(Append) =>
- assert(watermarkPredicateForDataForLateEvents.isDefined,
- "Watermark needs to be defined for streaming aggregation query
in append mode")
-
- assert(watermarkPredicateForKeysForEviction.isDefined,
- "Watermark needs to be defined for streaming aggregation query
in append mode")
+ if (watermarkPredicateForDataForLateEvents.isEmpty ||
+ watermarkPredicateForKeysForEviction.isEmpty) {
+ throw
QueryExecutionErrors.unsupportedStreamingQueryWithoutWatermark(
+ "Append", "streaming aggregations")
Review Comment:
maybe simply `aggregation`, as we have error message to make clear it's
against streaming Dataset
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4949,6 +4949,12 @@
},
"sqlState" : "0A000"
},
+ "UNSUPPORTED_STREAMING_QUERY_WITHOUT_WATERMARK" : {
Review Comment:
`OPERATOR` instead of `QUERY` to be specific?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2837,4 +2837,15 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
"parameter" -> toSQLId("unit"),
"invalidValue" -> s"'$invalidValue'"))
}
+
+ def unsupportedStreamingQueryWithoutWatermark(
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]