peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r655142981



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.reuse
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern._
+import org.apache.spark.sql.execution.{BaseSubqueryExec, 
ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.util.ReuseMap
+
+/**
+ * Find out duplicated exchanges and subqueries in the whole spark plan 
including subqueries, then
+ * use the same exchange or subquery for all the references.
+ *
+ * Note that the Spark plan is a mutually recursive data structure:
+ * SparkPlan -> Expr -> Subquery -> SparkPlan -> Expr -> Subquery -> ...
+ * Therefore, in this rule, we recursively rewrite the exchanges and 
subqueries in a bottom-up way,
+ * in one go.
+ */
+case object ReuseExchangeAndSubquery extends Rule[SparkPlan] {

Review comment:
       `ReuseMap` has changed since the first version of this PR.
   Unfortunately, I rebased the PR already so only some discussion remained: 
https://github.com/apache/spark/pull/28885#discussion_r455692637 (about 
reverting 2nd to 3rd).
   
   The 1st version used a simple `Map[<canonicalized plan>, <plan>]` as 
`AdaptiveExecutionContext` does.
   The 2nd version was `Map[<schema>, (<first plan with this schema>, 
Map[<canonicalized plan>, <plan>])]` with lazy initialization of the inner map 
to avoid canonicalization if there are no matching schemas but still provide 
quick lookup by canonicalized plans.
   This 3rd version reverted to the original `Map[<schema>, 
ArrayBuffer[<plan>]]` idea that `ReuseExchange` and `ReuseSubquery` had used.
   
   I can open a follow-up PR to improve `ReuseMap` to 2nd version if required, 
but I'm not sure that the improvement would be visible with TPCDS or real life 
queries.
   
   If we want to consolidate reuse map logic then I think we should also take 
into account that `ReuseAdaptiveSubquery` uses a concurrent, lock-free 
`TrieMap` map implementation which is not required by this non-AQE rule.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to