Repository: spark
Updated Branches:
  refs/heads/master c00e72f3d -> 44683e0f7


[SPARK-26023][SQL] Dumping truncated plans and generated code to a file

## What changes were proposed in this pull request?

In the PR, I propose new method for debugging queries by dumping info about 
their execution to a file. It saves logical, optimized and physical plan 
similar to the `explain()` method + generated code. One of the advantages of 
the method over `explain` is it does not materializes full output as one string 
in memory which can cause OOMs.

## How was this patch tested?

Added a few tests to `QueryExecutionSuite` to check positive and negative 
scenarios.

Closes #23018 from MaxGekk/truncated-plan-to-file.

Authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44683e0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44683e0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44683e0f

Branch: refs/heads/master
Commit: 44683e0f7b549d8e9e8fadd54d606f19d9cea37e
Parents: c00e72f
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Tue Nov 13 15:23:35 2018 +0100
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Tue Nov 13 15:23:35 2018 +0100

----------------------------------------------------------------------
 .../spark/sql/catalyst/trees/TreeNode.scala     | 43 ++++++++-----
 .../spark/sql/execution/QueryExecution.scala    | 68 +++++++++++++++-----
 .../sql/execution/WholeStageCodegenExec.scala   | 13 ++--
 .../spark/sql/execution/debug/package.scala     | 26 ++++++--
 .../sql/execution/QueryExecutionSuite.scala     | 59 +++++++++++++++++
 5 files changed, 165 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index becfa8d..1027216 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.catalyst.trees
 
+import java.io.Writer
 import java.util.UUID
 
 import scala.collection.Map
 import scala.reflect.ClassTag
 
+import org.apache.commons.io.output.StringBuilderWriter
 import org.apache.commons.lang3.ClassUtils
 import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
@@ -469,7 +471,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   def treeString: String = treeString(verbose = true)
 
   def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
-    generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix 
= addSuffix).toString
+    val writer = new StringBuilderWriter()
+    try {
+      treeString(writer, verbose, addSuffix)
+      writer.toString
+    } finally {
+      writer.close()
+    }
+  }
+
+  def treeString(
+      writer: Writer,
+      verbose: Boolean,
+      addSuffix: Boolean): Unit = {
+    generateTreeString(0, Nil, writer, verbose, "", addSuffix)
   }
 
   /**
@@ -521,7 +536,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
 
   /**
-   * Appends the string representation of this node and its children to the 
given StringBuilder.
+   * Appends the string representation of this node and its children to the 
given Writer.
    *
    * The `i`-th element in `lastChildren` indicates whether the ancestor of 
the current node at
    * depth `i + 1` is the last child of its own parent node.  The depth of the 
root node is 0, and
@@ -532,16 +547,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      builder: StringBuilder,
+      writer: Writer,
       verbose: Boolean,
       prefix: String = "",
-      addSuffix: Boolean = false): StringBuilder = {
+      addSuffix: Boolean = false): Unit = {
 
     if (depth > 0) {
       lastChildren.init.foreach { isLast =>
-        builder.append(if (isLast) "   " else ":  ")
+        writer.write(if (isLast) "   " else ":  ")
       }
-      builder.append(if (lastChildren.last) "+- " else ":- ")
+      writer.write(if (lastChildren.last) "+- " else ":- ")
     }
 
     val str = if (verbose) {
@@ -549,27 +564,25 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
     } else {
       simpleString
     }
-    builder.append(prefix)
-    builder.append(str)
-    builder.append("\n")
+    writer.write(prefix)
+    writer.write(str)
+    writer.write("\n")
 
     if (innerChildren.nonEmpty) {
       innerChildren.init.foreach(_.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose,
+        depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
         addSuffix = addSuffix))
       innerChildren.last.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose,
+        depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
         addSuffix = addSuffix)
     }
 
     if (children.nonEmpty) {
       children.init.foreach(_.generateTreeString(
-        depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix))
+        depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
       children.last.generateTreeString(
-        depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix)
+        depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
     }
-
-    builder
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 64f49e2..905d035 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.{BufferedWriter, OutputStreamWriter, Writer}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
+import org.apache.commons.io.output.StringBuilderWriter
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -189,23 +193,38 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
       """.stripMargin.trim
   }
 
+  private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
+    try f(writer)
+    catch {
+      case e: AnalysisException => writer.write(e.toString)
+    }
+  }
+
+  private def writePlans(writer: Writer): Unit = {
+    val (verbose, addSuffix) = (true, false)
+
+    writer.write("== Parsed Logical Plan ==\n")
+    writeOrError(writer)(logical.treeString(_, verbose, addSuffix))
+    writer.write("\n== Analyzed Logical Plan ==\n")
+    val analyzedOutput = stringOrError(Utils.truncatedString(
+      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", 
"))
+    writer.write(analyzedOutput)
+    writer.write("\n")
+    writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix))
+    writer.write("\n== Optimized Logical Plan ==\n")
+    writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix))
+    writer.write("\n== Physical Plan ==\n")
+    writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix))
+  }
+
   override def toString: String = withRedaction {
-    def output = Utils.truncatedString(
-      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
-    val analyzedPlan = Seq(
-      stringOrError(output),
-      stringOrError(analyzed.treeString(verbose = true))
-    ).filter(_.nonEmpty).mkString("\n")
-
-    s"""== Parsed Logical Plan ==
-       |${stringOrError(logical.treeString(verbose = true))}
-       |== Analyzed Logical Plan ==
-       |$analyzedPlan
-       |== Optimized Logical Plan ==
-       |${stringOrError(optimizedPlan.treeString(verbose = true))}
-       |== Physical Plan ==
-       |${stringOrError(executedPlan.treeString(verbose = true))}
-    """.stripMargin.trim
+    val writer = new StringBuilderWriter()
+    try {
+      writePlans(writer)
+      writer.toString
+    } finally {
+      writer.close()
+    }
   }
 
   def stringWithStats: String = withRedaction {
@@ -250,5 +269,22 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
     def codegenToSeq(): Seq[(String, String)] = {
       org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
     }
+
+    /**
+     * Dumps debug information about query execution into the specified file.
+     */
+    def toFile(path: String): Unit = {
+      val filePath = new Path(path)
+      val fs = 
filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+      val writer = new BufferedWriter(new 
OutputStreamWriter(fs.create(filePath)))
+
+      try {
+        writePlans(writer)
+        writer.write("\n== Whole Stage Codegen ==\n")
+        org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
+      } finally {
+        writer.close()
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 5f81b6f..29bcbca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.Writer
 import java.util.Locale
 import java.util.function.Supplier
 
@@ -450,11 +451,11 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with CodegenSupp
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      builder: StringBuilder,
+      writer: Writer,
       verbose: Boolean,
       prefix: String = "",
-      addSuffix: Boolean = false): StringBuilder = {
-    child.generateTreeString(depth, lastChildren, builder, verbose, "")
+      addSuffix: Boolean = false): Unit = {
+    child.generateTreeString(depth, lastChildren, writer, verbose, prefix = 
"", addSuffix = false)
   }
 
   override def needCopyResult: Boolean = false
@@ -726,11 +727,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      builder: StringBuilder,
+      writer: Writer,
       verbose: Boolean,
       prefix: String = "",
-      addSuffix: Boolean = false): StringBuilder = {
-    child.generateTreeString(depth, lastChildren, builder, verbose, 
s"*($codegenStageId) ")
+      addSuffix: Boolean = false): Unit = {
+    child.generateTreeString(depth, lastChildren, writer, verbose, 
s"*($codegenStageId) ", false)
   }
 
   override def needStopCheck: Boolean = true

http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 366e1fe..3511cef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.Writer
 import java.util.Collections
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.io.output.StringBuilderWriter
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -30,7 +33,6 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.execution.streaming.{StreamExecution, 
StreamingQueryWrapper}
-import 
org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
 
@@ -70,15 +72,25 @@ package object debug {
    * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
    */
   def codegenString(plan: SparkPlan): String = {
+    val writer = new StringBuilderWriter()
+
+    try {
+      writeCodegen(writer, plan)
+      writer.toString
+    } finally {
+      writer.close()
+    }
+  }
+
+  def writeCodegen(writer: Writer, plan: SparkPlan): Unit = {
     val codegenSeq = codegenStringSeq(plan)
-    var output = s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n"
+    writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
     for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
-      output += s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n"
-      output += subtree
-      output += "\nGenerated code:\n"
-      output += s"${code}\n"
+      writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
+      writer.write(subtree)
+      writer.write("\nGenerated code:\n")
+      writer.write(s"${code}\n")
     }
-    output
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 9644403..a5922d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -16,11 +16,70 @@
  */
 package org.apache.spark.sql.execution
 
+import scala.io.Source
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
OneRowRelation}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class QueryExecutionSuite extends SharedSQLContext {
+  def checkDumpedPlans(path: String, expected: Int): Unit = {
+    assert(Source.fromFile(path).getLines.toList
+      .takeWhile(_ != "== Whole Stage Codegen ==") == List(
+      "== Parsed Logical Plan ==",
+      s"Range (0, $expected, step=1, splits=Some(2))",
+      "",
+      "== Analyzed Logical Plan ==",
+      "id: bigint",
+      s"Range (0, $expected, step=1, splits=Some(2))",
+      "",
+      "== Optimized Logical Plan ==",
+      s"Range (0, $expected, step=1, splits=Some(2))",
+      "",
+      "== Physical Plan ==",
+      s"*(1) Range (0, $expected, step=1, splits=2)",
+      ""))
+  }
+  test("dumping query execution info to a file") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath + "/plans.txt"
+      val df = spark.range(0, 10)
+      df.queryExecution.debug.toFile(path)
+
+      checkDumpedPlans(path, expected = 10)
+    }
+  }
+
+  test("dumping query execution info to an existing file") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath + "/plans.txt"
+      val df = spark.range(0, 10)
+      df.queryExecution.debug.toFile(path)
+
+      val df2 = spark.range(0, 1)
+      df2.queryExecution.debug.toFile(path)
+      checkDumpedPlans(path, expected = 1)
+    }
+  }
+
+  test("dumping query execution info to non-existing folder") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath + "/newfolder/plans.txt"
+      val df = spark.range(0, 100)
+      df.queryExecution.debug.toFile(path)
+      checkDumpedPlans(path, expected = 100)
+    }
+  }
+
+  test("dumping query execution info by invalid path") {
+    val path = "1234567890://plans.txt"
+    val exception = intercept[IllegalArgumentException] {
+      spark.range(0, 100).queryExecution.debug.toFile(path)
+    }
+
+    assert(exception.getMessage.contains("Illegal character in scheme name"))
+  }
+
   test("toString() exception/error handling") {
     spark.experimental.extraStrategies = Seq(
         new SparkStrategy {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to