HyukjinKwon commented on a change in pull request #29270:
URL: https://github.com/apache/spark/pull/29270#discussion_r476176111



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.tags.ExtendedSQLTest
+
+// scalastyle:off line.size.limit
+/**
+ * Check that TPC-DS SparkPlans don't change.
+ * If there are plan differences, the error message looks like this:
+ *   Plans did not match:
+ *   last approved simplified plan: 
/path/to/tpcds-plan-stability/approved-plans-xxx/q1/simplified.txt
+ *   last approved explain plan: 
/path/to/tpcds-plan-stability/approved-plans-xxx/q1/explain.txt
+ *   [last approved simplified plan]
+ *
+ *   actual simplified plan: /path/to/tmp/q1.actual.simplified.txt
+ *   actual explain plan: /path/to/tmp/q1.actual.explain.txt
+ *   [actual simplified plan]
+ *
+ * The explain files are saved to help debug later, they are not checked. Only 
the simplified
+ * plans are checked (by string comparison).
+ *
+ *
+ * To run the entire test suite:
+ * {{{
+ *   build/sbt "sql/test-only *PlanStability[WithStats]Suite"
+ * }}}
+ *
+ * To run a single test file upon change:
+ * {{{
+ *   build/sbt "sql/test-only *PlanStability[WithStats]Suite -- -z 
(tpcds-v1.4/q49)"
+ * }}}
+ *
+ * To re-generate golden files for entire suite, run:
+ * {{{
+ *   SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only 
*PlanStability[WithStats]Suite"
+ * }}}
+ *
+ * To re-generate golden file for a single test, run:
+ * {{{
+ *   SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only 
*PlanStability[WithStats]Suite -- -z (tpcds-v1.4/q49)"
+ * }}}
+ */
+// scalastyle:on line.size.limit
+trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
+
+  private val originalMaxToStringFields = conf.maxToStringFields
+
+  override def beforeAll(): Unit = {
+    conf.setConf(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue)
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    conf.setConf(SQLConf.MAX_TO_STRING_FIELDS, originalMaxToStringFields)
+  }
+
+  private val regenerateGoldenFiles: Boolean = 
System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
+
+  protected val baseResourcePath = {
+    // use the same way as `SQLQueryTestSuite` to get the resource path
+    java.nio.file.Paths.get("src", "test", "resources", 
"tpcds-plan-stability").toFile
+  }
+
+  private val referenceRegex = "#\\d+".r
+  private val normalizeRegex = "#\\d+L?".r
+
+  def goldenFilePath: String
+
+  private def getDirForTest(name: String): File = {
+    new File(goldenFilePath, name)
+  }
+
+  private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = {
+    val file = new File(dir, "simplified.txt")
+    val approved = FileUtils.readFileToString(file, StandardCharsets.UTF_8)
+    approved == actualSimplifiedPlan
+  }
+
+  /**
+   * Serialize and save this SparkPlan.
+   * The resulting file is used by [[checkWithApproved]] to check stability.
+   *
+   * @param plan    the SparkPlan
+   * @param name    the name of the query
+   * @param explain the full explain output; this is saved to help debug later 
as the simplified
+   *                plan is not too useful for debugging
+   */
+  private def generateApprovedPlanFile(plan: SparkPlan, name: String, explain: 
String): Unit = {
+    val dir = getDirForTest(name)
+    val simplified = getSimplifiedPlan(plan)
+    val foundMatch = dir.exists() && isApproved(dir, simplified)
+
+    if (!foundMatch) {
+      FileUtils.deleteDirectory(dir)
+      assert(dir.mkdirs())
+
+      val file = new File(dir, "simplified.txt")
+      FileUtils.writeStringToFile(file, simplified, StandardCharsets.UTF_8)
+      val fileOriginalPlan = new File(dir, "explain.txt")
+      FileUtils.writeStringToFile(fileOriginalPlan, explain, 
StandardCharsets.UTF_8)
+      logDebug(s"APPROVED: $file $fileOriginalPlan")
+    }
+  }
+
+  private def checkWithApproved(plan: SparkPlan, name: String, explain: 
String): Unit = {
+    val dir = getDirForTest(name)
+    val tempDir = FileUtils.getTempDirectory
+    val actualSimplified = getSimplifiedPlan(plan)
+    val foundMatch = isApproved(dir, actualSimplified)
+
+    if (!foundMatch) {
+      // show diff with last approved
+      val approvedSimplifiedFile = new File(dir, "simplified.txt")
+      val approvedExplainFile = new File(dir, "explain.txt")
+
+      val actualSimplifiedFile = new File(tempDir, 
s"$name.actual.simplified.txt")
+      val actualExplainFile = new File(tempDir, s"$name.actual.explain.txt")
+
+      val approvedSimplified = FileUtils.readFileToString(
+        approvedSimplifiedFile, StandardCharsets.UTF_8)
+      // write out for debugging
+      FileUtils.writeStringToFile(actualSimplifiedFile, actualSimplified, 
StandardCharsets.UTF_8)
+      FileUtils.writeStringToFile(actualExplainFile, explain, 
StandardCharsets.UTF_8)
+
+      fail(
+        s"""
+          |Plans did not match:
+          |last approved simplified plan: 
${approvedSimplifiedFile.getAbsolutePath}
+          |last approved explain plan: ${approvedExplainFile.getAbsolutePath}
+          |
+          |$approvedSimplified
+          |
+          |actual simplified plan: ${actualSimplifiedFile.getAbsolutePath}
+          |actual explain plan: ${actualExplainFile.getAbsolutePath}
+          |
+          |$actualSimplified
+        """.stripMargin)
+    }
+  }
+
+  /**
+   * Get the simplified plan for a specific SparkPlan. In the simplified plan, 
the node only has
+   * its name and all the sorted reference and produced attributes 
names(without ExprId) and its
+   * simplified children as well. And we'll only identify the performance 
sensitive nodes, e.g.,
+   * Exchange, Subquery, in the simplified plan. Given such a identical but 
simplified plan, we'd
+   * expect to avoid frequent plan changing and catch the possible meaningful 
regression.
+   */
+  private def getSimplifiedPlan(plan: SparkPlan): String = {
+    val exchangeIdMap = new mutable.HashMap[Int, Int]()
+    val subqueriesMap = new mutable.HashMap[Int, Int]()
+
+    def getId(plan: SparkPlan): Int = plan match {
+      case exchange: Exchange => exchangeIdMap.getOrElseUpdate(exchange.id, 
exchangeIdMap.size + 1)
+      case ReusedExchangeExec(_, exchange) =>
+        exchangeIdMap.getOrElseUpdate(exchange.id, exchangeIdMap.size + 1)
+      case subquery: SubqueryExec =>
+        subqueriesMap.getOrElseUpdate(subquery.id, subqueriesMap.size + 1)
+      case subquery: SubqueryBroadcastExec =>
+        subqueriesMap.getOrElseUpdate(subquery.id, subqueriesMap.size + 1)
+      case ReusedSubqueryExec(subquery) =>
+        subqueriesMap.getOrElseUpdate(subquery.id, subqueriesMap.size + 1)
+      case _ => -1
+    }
+
+    /**
+     * Some expression names have ExprId in them due to using things such as
+     * "sum(sr_return_amt#14)", so we remove all of these using regex
+     */
+    def cleanUpReferences(references: AttributeSet): String = {
+      
referenceRegex.replaceAllIn(references.toSeq.map(_.name).sorted.mkString(","), 
"")
+    }
+
+    /**
+     * Generate a simplified plan as a string
+     * Example output:
+     * TakeOrderedAndProject [c_customer_id]
+     *   WholeStageCodegen
+     *     Project [c_customer_id]
+     */
+    def getSimplifiedPlan(node: SparkPlan, depth: Int): String = {

Review comment:
       Same name of the function (inner and outer) looks a bit odd ...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to