This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d28068434c9 [SPARK-41732][SQL][SS] Apply tree-pattern based pruning 
for the rule SessionWindowing
d28068434c9 is described below

commit d28068434c96348815afb6fe4883744113af5cde
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Dec 28 12:58:06 2022 +0900

    [SPARK-41732][SQL][SS] Apply tree-pattern based pruning for the rule 
SessionWindowing
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to apply tree-pattern based pruning for the rule 
SessionWindowing, to minimize the evaluation of rule with SessionWindow node.
    
    ### Why are the changes needed?
    
    The rule SessionWindowing is unnecessarily evaluated multiple times without 
proper pruning.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #39245 from HeartSaVioR/SPARK-41732.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala  | 5 +++--
 .../org/apache/spark/sql/catalyst/expressions/SessionWindow.scala    | 2 ++
 .../scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala | 1 +
 .../scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala     | 1 +
 4 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index df6b1c400bb..be837d72c5a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, 
IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, 
TimeWindow, WindowTime}
 import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_WINDOW
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, 
TIME_WINDOW}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, 
Metadata, MetadataBuilder, StructType}
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -187,7 +187,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
    * This also adds a marker to the session column so that downstream can 
easily find the column
    * on session window.
    */
-  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(SESSION_WINDOW), ruleId) {
     case p: LogicalPlan if p.children.size == 1 =>
       val child = p.children.head
       val sessionExpressions =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
index 02273b0c461..021f119e0a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, 
TreePattern}
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -91,6 +92,7 @@ case class SessionWindow(timeColumn: Expression, gapDuration: 
Expression) extend
   override def dataType: DataType = new StructType()
     .add(StructField("start", children.head.dataType))
     .add(StructField("end", children.head.dataType))
+  final override val nodePatterns: Seq[TreePattern] = Seq(SESSION_WINDOW)
 
   // This expression is replaced in the analyzer.
   override lazy val resolved = false
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 41aa68f0ec6..e824a0b533d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -94,6 +94,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
+      "org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
       "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
       "org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
       
"org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule"
 ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index fbe885bda06..e2e7fca27e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -82,6 +82,7 @@ object TreePattern extends Enumeration  {
   val SCALAR_SUBQUERY: Value = Value
   val SCALAR_SUBQUERY_REFERENCE: Value = Value
   val SCALA_UDF: Value = Value
+  val SESSION_WINDOW: Value = Value
   val SORT: Value = Value
   val SUBQUERY_ALIAS: Value = Value
   val SUM: Value = Value


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to