squito commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r459072387



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ReuseExchangeAndSubquerySuite.scala
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ReuseExchangeAndSubquerySuite extends SparkPlanTest with 
SharedSparkSession {
+
+  val tableFormat: String = "parquet"
+
+  test("SPARK-32041: No reuse interference inside ReuseExchange") {
+    withTable("df1", "df2") {
+      spark.range(100)
+        .select(col("id"), col("id").as("k"))
+        .write
+        .partitionBy("k")
+        .format(tableFormat)
+        .mode("overwrite")
+        .saveAsTable("df1")
+
+      spark.range(10)
+        .select(col("id"), col("id").as("k"))
+        .write
+        .format(tableFormat)
+        .mode("overwrite")
+        .saveAsTable("df2")
+
+      val df = sql(
+        """
+          |WITH t AS (
+          |  SELECT df1.id, df2.k
+          |  FROM df1 JOIN df2 ON df1.k = df2.k
+          |  WHERE df2.id < 2
+          |)
+          |SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
+          |""".stripMargin)
+
+      val plan = df.queryExecution.executedPlan
+
+      val exchangeIds = plan.collectWithSubqueries { case e: Exchange => e.id }
+      val reusedExchangeIds = plan.collectWithSubqueries {
+        case re: ReusedExchangeExec => re.child.id
+      }
+
+      assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
+        "ReusedExchangeExec should reuse an existing exchange")

Review comment:
       so what did spark do before this change when you tried to run this 
query?  It would fail at runtime because the reuseExchange referenced a 
non-existing exchange?  

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/util/ReuseMap.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import scala.collection.mutable.Map
+import scala.language.existentials
+
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Map of canonicalized plans that can be used to find reuse possibilities.
+ *
+ * To avoid costly canonicalization of a plan:
+ * - we use its schema first to check if it can be replaced to a reused one at 
all
+ * - we insert it into the map of canonicalized plans only when at least 2 
have the same schema
+ */
+class ReuseMap[T <: QueryPlan[_]] {
+  // scalastyle:off structural.type
+  private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]()

Review comment:
       I consider the second version much easier to reason about.  Its slightly 
more types you have to put into the code, but IMO those types help with 
readability.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
##########
@@ -1646,4 +1646,25 @@ class SubquerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     checkAnswer(df, df2)
     checkAnswer(df, Nil)
   }
+
+  test("Subquery reuse across the whole plan") {
+    val df = sql(

Review comment:
       I mentioned this to Peter offline -- I don't really agree with adding 
the plan in a comment.  Other changes will continue happening in the rest of 
the planner, and then the plan will change more, and the comment will get out 
of date.  If you actually want the plan to remain static, then you should add 
asserts against it (though I doubt that is what you want).  If somebody wants 
to see the plan, they should run the test themselves to see the plan, right?
   
   (I'm also unfamiliar with the norms around this part of the codebase, so its 
fine if its not unusual to put in the entire plan in a comment.)

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/util/ReuseMap.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import scala.collection.mutable.Map
+import scala.language.existentials
+
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Map of canonicalized plans that can be used to find reuse possibilities.
+ *
+ * To avoid costly canonicalization of a plan:
+ * - we use its schema first to check if it can be replaced to a reused one at 
all
+ * - we insert it into the map of canonicalized plans only when at least 2 
have the same schema
+ */
+class ReuseMap[T <: QueryPlan[_]] {
+  // scalastyle:off structural.type
+  private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]()
+  // scalastyle:on structural.type
+
+  /**
+   * Find a matching plan with the same canonicalized form in the map or add 
the new plan to the
+   * map otherwise.
+   *
+   * @param plan the input plan
+   * @return the matching plan or the input plan
+   */
+  def lookupOrElseAdd(plan: T): T = {
+    val (firstSameSchemaPlan, sameResultPlans) = 
map.getOrElseUpdate(plan.schema, plan -> Map())

Review comment:
       to me, this extra step of delayed `canonicalization` is orthogonal from 
the main point of this change.  It seems like a perfectly reasonable idea, but 
instead I'd first have `ReuseMap` still use a `Map[StructType, 
ArrayBuffer[SparkPlan]]` as the original rules you're replacing had.
   
   The change you're suggesting here may be a perfectly good change as well, 
just seems like it should go in separately.  Eg. it has totally different 
concerns, like (a) do all plans have a consistent `equals()` and `hashcode()`?  
(b) does this extra complexity actually result in sufficient performance 
improvement during planning?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
##########
@@ -156,4 +158,46 @@ class ExchangeSuite extends SparkPlanTest with 
SharedSparkSession {
     val projection2 = cached.select("_1", "_3").queryExecution.executedPlan
     assert(!projection1.sameResult(projection2))
   }
+
+  test("Exchange reuse across the whole plan") {
+    val df = sql(
+      """
+        |SELECT
+        |  (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = 
a.key),
+        |  a.key
+        |FROM testData AS a
+        |JOIN testData AS b ON b.key = a.key
+      """.stripMargin)
+
+    val plan = df.queryExecution.executedPlan
+
+    val exchangeIds = plan.collectWithSubqueries { case e: Exchange => e.id }
+    val reusedExchangeIds = plan.collectWithSubqueries {
+      case re: ReusedExchangeExec => re.child.id
+    }
+
+    assert(exchangeIds.size == 2, "Whole plan exchange reusing not working 
correctly")
+    assert(reusedExchangeIds.size == 3, "Whole plan exchange reusing not 
working correctly")
+    assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
+      "ReusedExchangeExec should reuse an existing exchange")
+
+    val df2 = sql(

Review comment:
       I think this is where a comment would be helpful explaining that before 
SPARK-32041, this wouldn't work because the exchange doesn't get reused between 
the two different subqueries.
   
   Though to be honest, I don't understand why it actually needs to change to 
be in one rule.  It still seems to me like you could do it in `ReuseExchange` 
by adding some more logic to properly recurse into the child plan of a Subquery 
(though unless there is a really succinct explanation, I probably just need to 
play with this myself more than I have so far to see why).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to