MaxGekk commented on code in PR #41578:
URL: https://github.com/apache/spark/pull/41578#discussion_r1247460708
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike,
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan,
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike,
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan,
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow,
WindowExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLId
+import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLStmt
Review Comment:
Could you pass sequences of (windowFunc, columnName, windowSpec), and do
quoting inside of `QueryExecutionErrors`, please.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -508,8 +511,27 @@ object UnsupportedOperationChecker extends Logging {
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming
DataFrames/Datasets")
- case Window(_, _, _, child) if child.isStreaming =>
- throwError("Non-time-based windows are not supported on streaming
DataFrames/Datasets")
+ case Window(windowExpression, _, _, child) if child.isStreaming =>
+ val windowFunc = windowExpression.flatMap { e =>
+ e.collect {
+ case we: WindowExpression =>
toSQLStmt(we.windowFunction.toString)
+ }
+ }.mkString(", ")
+ val columnName = windowExpression.flatMap { e =>
+ e.collect {
+ case we: WindowExpression => toSQLId(e.toAttribute.sql)
+ }
+ }.mkString(", ")
+ val windowSpec = windowExpression.flatMap { e =>
+ e.collect {
+ case we: WindowExpression => toSQLStmt(we.windowSpec.sql)
+ }
+ }.mkString(", ")
Review Comment:
Let's merge all of them and return (we.windowFunction, e.toAttribute,
we.windowSpec)
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
messageParameters = Map("className" -> className, "operator" ->
operator))
}
+ def nonTimeWindowNotSupportedInStreamingError(
+ windowFunc: String,
+ columnName: String,
+ windowSpec: String,
+ origin: Origin): AnalysisException = {
+ new AnalysisException(
+ errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
+ messageParameters = Map(
+ "windowFunc" -> windowFunc,
+ "columnName" -> columnName,
+ "windowSpec" -> windowSpec),
Review Comment:
Better to perform quoting here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]