This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 87a235c2143 [SPARK-41733][SQL][SS] Apply tree-pattern based pruning for the rule ResolveWindowTime 87a235c2143 is described below commit 87a235c2143449bd8da0acee4ec3cd99993155bb Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Dec 28 16:01:25 2022 +0900 [SPARK-41733][SQL][SS] Apply tree-pattern based pruning for the rule ResolveWindowTime ### What changes were proposed in this pull request? This PR proposes to apply tree-pattern based pruning for the rule ResolveWindowTime, to minimize the evaluation of rule with WindowTime node. ### Why are the changes needed? The rule ResolveWindowTime is unnecessarily evaluated multiple times without proper pruning. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. New test case is added to cover SQL usage for `window_time`. Closes #39247 from HeartSaVioR/SPARK-41733. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/catalyst/analysis/ResolveTimeWindows.scala | 5 ++-- .../sql/catalyst/expressions/WindowTime.scala | 3 +++ .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../spark/sql/catalyst/trees/TreePatterns.scala | 1 + .../spark/sql/DataFrameTimeWindowingSuite.scala | 27 ++++++++++++++++++++++ 5 files changed, 35 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index be837d72c5a..6378f4eedd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW} +import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, Metadata, MetadataBuilder, StructType} import org.apache.spark.unsafe.types.CalendarInterval @@ -287,7 +287,8 @@ object SessionWindowing extends Rule[LogicalPlan] { * The correct representative event time of a window is ``window.end - 1``. * */ object ResolveWindowTime extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(WINDOW_TIME), ruleId) { case p: LogicalPlan if p.children.size == 1 => val child = p.children.head val windowTimeExpressions = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala index 1bb934cb202..59b5ca8f2bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, WINDOW_TIME} import org.apache.spark.sql.types._ // scalastyle:off line.size.limit line.contains.tab @@ -52,6 +53,8 @@ case class WindowTime(windowColumn: Expression) override def dataType: DataType = child.dataType.asInstanceOf[StructType].head.dataType + final override val nodePatterns: Seq[TreePattern] = Seq(WINDOW_TIME) + override def prettyName: String = "window_time" // This expression is replaced in the analyzer. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index e824a0b533d..4cf774b0362 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -94,6 +94,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: + "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" :: "org.apache.spark.sql.catalyst.analysis.SessionWindowing" :: "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" :: "org.apache.spark.sql.catalyst.analysis.TimeWindowing" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index e2e7fca27e0..9eb8ce21ef2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -90,6 +90,7 @@ object TreePattern extends Enumeration { val TIME_ZONE_AWARE_EXPRESSION: Value = Value val TRUE_OR_FALSE_LITERAL: Value = Value val WINDOW_EXPRESSION: Value = Value + val WINDOW_TIME: Value = Value val UNARY_POSITIVE: Value = Value val UNPIVOT: Value = Value val UPDATE_FIELDS: Value = Value diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index a878e0ffa51..0bbb9460feb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -651,4 +651,31 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { ) ) } + + test("window_time in SQL") { + withTempView("tmpView") { + val df = Seq( + ("2016-03-27 19:38:19", 1), ("2016-03-27 19:39:25", 2) + ).toDF("time", "value") + df.createOrReplaceTempView("tmpView") + checkAnswer( + spark.sql( + s""" + |select + | CAST(window.start AS string), CAST(window.end AS string), + | CAST(window_time(window) AS string), counts + |from + |( + | select window, count(*) AS counts from tmpView + | group by window(time, "10 seconds") + | order by window.start + |) + |""".stripMargin), + Seq( + Row("2016-03-27 19:38:10", "2016-03-27 19:38:20", "2016-03-27 19:38:19.999999", 1), + Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", "2016-03-27 19:39:29.999999", 1) + ) + ) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org