This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a64f600c62d [SPARK-41732][SQL][SS][3.3] Apply tree-pattern based 
pruning for the rule SessionWindowing
a64f600c62d is described below

commit a64f600c62dc61bd13a4541c4fea41bf2ede16a9
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Dec 28 20:51:09 2022 +0900

    [SPARK-41732][SQL][SS][3.3] Apply tree-pattern based pruning for the rule 
SessionWindowing
    
    This PR ports back #39245 to branch-3.3.
    
    This PR proposes to apply tree-pattern based pruning for the rule 
SessionWindowing, to minimize the evaluation of rule with SessionWindow node.
    
    The rule SessionWindowing is unnecessarily evaluated multiple times without 
proper pruning.
    
    No.
    
    Existing tests.
    
    Closes #39253 from HeartSaVioR/SPARK-41732-3.3.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 02a7fda304b39779bff7fe88f146ae106bd61f1a)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala   | 3 ++-
 .../org/apache/spark/sql/catalyst/expressions/SessionWindow.scala      | 2 ++
 .../scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala   | 1 +
 .../main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala  | 1 +
 4 files changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c5b2229db31..4d1ea95d0e7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3992,7 +3992,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
    * This also adds a marker to the session column so that downstream can 
easily find the column
    * on session window.
    */
-  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(SESSION_WINDOW), ruleId) {
     case p: LogicalPlan if p.children.size == 1 =>
       val child = p.children.head
       val sessionExpressions =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
index 796ea27efc4..31f3c3cd802 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, 
TreePattern}
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -73,6 +74,7 @@ case class SessionWindow(timeColumn: Expression, gapDuration: 
Expression) extend
   override def dataType: DataType = new StructType()
     .add(StructField("start", TimestampType))
     .add(StructField("end", TimestampType))
+  final override val nodePatterns: Seq[TreePattern] = Seq(SESSION_WINDOW)
 
   // This expression is replaced in the analyzer.
   override lazy val resolved = false
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 267d84209c3..284103fc48f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -87,6 +87,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
+      "org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
       "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
       "org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
       
"org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule"
 ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 7322121e84b..cd864dfc3fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -76,6 +76,7 @@ object TreePattern extends Enumeration  {
   val RUNTIME_REPLACEABLE: Value = Value
   val SCALAR_SUBQUERY: Value = Value
   val SCALA_UDF: Value = Value
+  val SESSION_WINDOW: Value = Value
   val SORT: Value = Value
   val SUBQUERY_ALIAS: Value = Value
   val SUM: Value = Value


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to