This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new a64f600c62d [SPARK-41732][SQL][SS][3.3] Apply tree-pattern based pruning for the rule SessionWindowing a64f600c62d is described below commit a64f600c62dc61bd13a4541c4fea41bf2ede16a9 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Dec 28 20:51:09 2022 +0900 [SPARK-41732][SQL][SS][3.3] Apply tree-pattern based pruning for the rule SessionWindowing This PR ports back #39245 to branch-3.3. This PR proposes to apply tree-pattern based pruning for the rule SessionWindowing, to minimize the evaluation of rule with SessionWindow node. The rule SessionWindowing is unnecessarily evaluated multiple times without proper pruning. No. Existing tests. Closes #39253 from HeartSaVioR/SPARK-41732-3.3. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 02a7fda304b39779bff7fe88f146ae106bd61f1a) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../org/apache/spark/sql/catalyst/expressions/SessionWindow.scala | 2 ++ .../scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala | 1 + .../main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c5b2229db31..4d1ea95d0e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3992,7 +3992,8 @@ object SessionWindowing extends Rule[LogicalPlan] { * This also adds a marker to the session column so that downstream can easily find the column * on session window. */ - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(SESSION_WINDOW), ruleId) { case p: LogicalPlan if p.children.size == 1 => val child = p.children.head val sessionExpressions = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala index 796ea27efc4..31f3c3cd802 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TreePattern} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -73,6 +74,7 @@ case class SessionWindow(timeColumn: Expression, gapDuration: Expression) extend override def dataType: DataType = new StructType() .add(StructField("start", TimestampType)) .add(StructField("end", TimestampType)) + final override val nodePatterns: Seq[TreePattern] = Seq(SESSION_WINDOW) // This expression is replaced in the analyzer. override lazy val resolved = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 267d84209c3..284103fc48f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -87,6 +87,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: + "org.apache.spark.sql.catalyst.analysis.SessionWindowing" :: "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" :: "org.apache.spark.sql.catalyst.analysis.TimeWindowing" :: "org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 7322121e84b..cd864dfc3fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -76,6 +76,7 @@ object TreePattern extends Enumeration { val RUNTIME_REPLACEABLE: Value = Value val SCALAR_SUBQUERY: Value = Value val SCALA_UDF: Value = Value + val SESSION_WINDOW: Value = Value val SORT: Value = Value val SUBQUERY_ALIAS: Value = Value val SUM: Value = Value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org