This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 87a235c2143 [SPARK-41733][SQL][SS] Apply tree-pattern based pruning 
for the rule ResolveWindowTime
87a235c2143 is described below

commit 87a235c2143449bd8da0acee4ec3cd99993155bb
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Dec 28 16:01:25 2022 +0900

    [SPARK-41733][SQL][SS] Apply tree-pattern based pruning for the rule 
ResolveWindowTime
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to apply tree-pattern based pruning for the rule 
ResolveWindowTime, to minimize the evaluation of rule with WindowTime node.
    
    ### Why are the changes needed?
    
    The rule ResolveWindowTime is unnecessarily evaluated multiple times 
without proper pruning.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests. New test case is added to cover SQL usage for `window_time`.
    
    Closes #39247 from HeartSaVioR/SPARK-41733.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/catalyst/analysis/ResolveTimeWindows.scala |  5 ++--
 .../sql/catalyst/expressions/WindowTime.scala      |  3 +++
 .../sql/catalyst/rules/RuleIdCollection.scala      |  1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala    |  1 +
 .../spark/sql/DataFrameTimeWindowingSuite.scala    | 27 ++++++++++++++++++++++
 5 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index be837d72c5a..6378f4eedd3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, 
IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, 
TimeWindow, WindowTime}
 import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, 
TIME_WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, 
TIME_WINDOW, WINDOW_TIME}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, 
Metadata, MetadataBuilder, StructType}
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -287,7 +287,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
  * The correct representative event time of a window is ``window.end - 1``.
  * */
 object ResolveWindowTime extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(WINDOW_TIME), ruleId) {
     case p: LogicalPlan if p.children.size == 1 =>
       val child = p.children.head
       val windowTimeExpressions =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
index 1bb934cb202..59b5ca8f2bd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, 
WINDOW_TIME}
 import org.apache.spark.sql.types._
 
 // scalastyle:off line.size.limit line.contains.tab
@@ -52,6 +53,8 @@ case class WindowTime(windowColumn: Expression)
 
   override def dataType: DataType = 
child.dataType.asInstanceOf[StructType].head.dataType
 
+  final override val nodePatterns: Seq[TreePattern] = Seq(WINDOW_TIME)
+
   override def prettyName: String = "window_time"
 
   // This expression is replaced in the analyzer.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index e824a0b533d..4cf774b0362 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -94,6 +94,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
+      "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
       "org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
       "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
       "org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index e2e7fca27e0..9eb8ce21ef2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -90,6 +90,7 @@ object TreePattern extends Enumeration  {
   val TIME_ZONE_AWARE_EXPRESSION: Value = Value
   val TRUE_OR_FALSE_LITERAL: Value = Value
   val WINDOW_EXPRESSION: Value = Value
+  val WINDOW_TIME: Value = Value
   val UNARY_POSITIVE: Value = Value
   val UNPIVOT: Value = Value
   val UPDATE_FIELDS: Value = Value
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index a878e0ffa51..0bbb9460feb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -651,4 +651,31 @@ class DataFrameTimeWindowingSuite extends QueryTest with 
SharedSparkSession {
       )
     )
   }
+
+  test("window_time in SQL") {
+    withTempView("tmpView") {
+      val df = Seq(
+        ("2016-03-27 19:38:19", 1), ("2016-03-27 19:39:25", 2)
+      ).toDF("time", "value")
+      df.createOrReplaceTempView("tmpView")
+      checkAnswer(
+        spark.sql(
+          s"""
+             |select
+             |  CAST(window.start AS string), CAST(window.end AS string),
+             |  CAST(window_time(window) AS string), counts
+             |from
+             |(
+             |  select window, count(*) AS counts from tmpView
+             |  group by window(time, "10 seconds")
+             |  order by window.start
+             |)
+             |""".stripMargin),
+        Seq(
+          Row("2016-03-27 19:38:10", "2016-03-27 19:38:20", "2016-03-27 
19:38:19.999999", 1),
+          Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", "2016-03-27 
19:39:29.999999", 1)
+        )
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to