Repository: spark Updated Branches: refs/heads/master c00e72f3d -> 44683e0f7
[SPARK-26023][SQL] Dumping truncated plans and generated code to a file ## What changes were proposed in this pull request? In the PR, I propose new method for debugging queries by dumping info about their execution to a file. It saves logical, optimized and physical plan similar to the `explain()` method + generated code. One of the advantages of the method over `explain` is it does not materializes full output as one string in memory which can cause OOMs. ## How was this patch tested? Added a few tests to `QueryExecutionSuite` to check positive and negative scenarios. Closes #23018 from MaxGekk/truncated-plan-to-file. Authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44683e0f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44683e0f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44683e0f Branch: refs/heads/master Commit: 44683e0f7b549d8e9e8fadd54d606f19d9cea37e Parents: c00e72f Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Tue Nov 13 15:23:35 2018 +0100 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Tue Nov 13 15:23:35 2018 +0100 ---------------------------------------------------------------------- .../spark/sql/catalyst/trees/TreeNode.scala | 43 ++++++++----- .../spark/sql/execution/QueryExecution.scala | 68 +++++++++++++++----- .../sql/execution/WholeStageCodegenExec.scala | 13 ++-- .../spark/sql/execution/debug/package.scala | 26 ++++++-- .../sql/execution/QueryExecutionSuite.scala | 59 +++++++++++++++++ 5 files changed, 165 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index becfa8d..1027216 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.catalyst.trees +import java.io.Writer import java.util.UUID import scala.collection.Map import scala.reflect.ClassTag +import org.apache.commons.io.output.StringBuilderWriter import org.apache.commons.lang3.ClassUtils import org.json4s.JsonAST._ import org.json4s.JsonDSL._ @@ -469,7 +471,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def treeString: String = treeString(verbose = true) def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { - generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString + val writer = new StringBuilderWriter() + try { + treeString(writer, verbose, addSuffix) + writer.toString + } finally { + writer.close() + } + } + + def treeString( + writer: Writer, + verbose: Boolean, + addSuffix: Boolean): Unit = { + generateTreeString(0, Nil, writer, verbose, "", addSuffix) } /** @@ -521,7 +536,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { protected def innerChildren: Seq[TreeNode[_]] = Seq.empty /** - * Appends the string representation of this node and its children to the given StringBuilder. + * Appends the string representation of this node and its children to the given Writer. * * The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at * depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and @@ -532,16 +547,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - builder: StringBuilder, + writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { + addSuffix: Boolean = false): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => - builder.append(if (isLast) " " else ": ") + writer.write(if (isLast) " " else ": ") } - builder.append(if (lastChildren.last) "+- " else ":- ") + writer.write(if (lastChildren.last) "+- " else ":- ") } val str = if (verbose) { @@ -549,27 +564,25 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } else { simpleString } - builder.append(prefix) - builder.append(str) - builder.append("\n") + writer.write(prefix) + writer.write(str) + writer.write("\n") if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose, + depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, addSuffix = addSuffix)) innerChildren.last.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose, + depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, addSuffix = addSuffix) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix)) + depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix)) children.last.generateTreeString( - depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix) + depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix) } - - builder } /** http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 64f49e2..905d035 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.execution +import java.io.{BufferedWriter, OutputStreamWriter, Writer} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import org.apache.commons.io.output.StringBuilderWriter +import org.apache.hadoop.fs.Path + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -189,23 +193,38 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { """.stripMargin.trim } + private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { + try f(writer) + catch { + case e: AnalysisException => writer.write(e.toString) + } + } + + private def writePlans(writer: Writer): Unit = { + val (verbose, addSuffix) = (true, false) + + writer.write("== Parsed Logical Plan ==\n") + writeOrError(writer)(logical.treeString(_, verbose, addSuffix)) + writer.write("\n== Analyzed Logical Plan ==\n") + val analyzedOutput = stringOrError(Utils.truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")) + writer.write(analyzedOutput) + writer.write("\n") + writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix)) + writer.write("\n== Optimized Logical Plan ==\n") + writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix)) + writer.write("\n== Physical Plan ==\n") + writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix)) + } + override def toString: String = withRedaction { - def output = Utils.truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") - val analyzedPlan = Seq( - stringOrError(output), - stringOrError(analyzed.treeString(verbose = true)) - ).filter(_.nonEmpty).mkString("\n") - - s"""== Parsed Logical Plan == - |${stringOrError(logical.treeString(verbose = true))} - |== Analyzed Logical Plan == - |$analyzedPlan - |== Optimized Logical Plan == - |${stringOrError(optimizedPlan.treeString(verbose = true))} - |== Physical Plan == - |${stringOrError(executedPlan.treeString(verbose = true))} - """.stripMargin.trim + val writer = new StringBuilderWriter() + try { + writePlans(writer) + writer.toString + } finally { + writer.close() + } } def stringWithStats: String = withRedaction { @@ -250,5 +269,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + + /** + * Dumps debug information about query execution into the specified file. + */ + def toFile(path: String): Unit = { + val filePath = new Path(path) + val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) + + try { + writePlans(writer) + writer.write("\n== Whole Stage Codegen ==\n") + org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) + } finally { + writer.close() + } + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5f81b6f..29bcbca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import java.io.Writer import java.util.Locale import java.util.function.Supplier @@ -450,11 +451,11 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - builder: StringBuilder, + writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { - child.generateTreeString(depth, lastChildren, builder, verbose, "") + addSuffix: Boolean = false): Unit = { + child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false) } override def needCopyResult: Boolean = false @@ -726,11 +727,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - builder: StringBuilder, + writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { - child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ") + addSuffix: Boolean = false): Unit = { + child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false) } override def needStopCheck: Boolean = true http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 366e1fe..3511cef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution +import java.io.Writer import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.commons.io.output.StringBuilderWriter + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -30,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} -import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{AccumulatorV2, LongAccumulator} @@ -70,15 +72,25 @@ package object debug { * @return single String containing all WholeStageCodegen subtrees and corresponding codegen */ def codegenString(plan: SparkPlan): String = { + val writer = new StringBuilderWriter() + + try { + writeCodegen(writer, plan) + writer.toString + } finally { + writer.close() + } + } + + def writeCodegen(writer: Writer, plan: SparkPlan): Unit = { val codegenSeq = codegenStringSeq(plan) - var output = s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n" + writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") for (((subtree, code), i) <- codegenSeq.zipWithIndex) { - output += s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n" - output += subtree - output += "\nGenerated code:\n" - output += s"${code}\n" + writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") + writer.write(subtree) + writer.write("\nGenerated code:\n") + writer.write(s"${code}\n") } - output } /** http://git-wip-us.apache.org/repos/asf/spark/blob/44683e0f/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 9644403..a5922d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -16,11 +16,70 @@ */ package org.apache.spark.sql.execution +import scala.io.Source + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.test.SharedSQLContext class QueryExecutionSuite extends SharedSQLContext { + def checkDumpedPlans(path: String, expected: Int): Unit = { + assert(Source.fromFile(path).getLines.toList + .takeWhile(_ != "== Whole Stage Codegen ==") == List( + "== Parsed Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Analyzed Logical Plan ==", + "id: bigint", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Optimized Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Physical Plan ==", + s"*(1) Range (0, $expected, step=1, splits=2)", + "")) + } + test("dumping query execution info to a file") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path) + + checkDumpedPlans(path, expected = 10) + } + } + + test("dumping query execution info to an existing file") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path) + + val df2 = spark.range(0, 1) + df2.queryExecution.debug.toFile(path) + checkDumpedPlans(path, expected = 1) + } + } + + test("dumping query execution info to non-existing folder") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/newfolder/plans.txt" + val df = spark.range(0, 100) + df.queryExecution.debug.toFile(path) + checkDumpedPlans(path, expected = 100) + } + } + + test("dumping query execution info by invalid path") { + val path = "1234567890://plans.txt" + val exception = intercept[IllegalArgumentException] { + spark.range(0, 100).queryExecution.debug.toFile(path) + } + + assert(exception.getMessage.contains("Illegal character in scheme name")) + } + test("toString() exception/error handling") { spark.experimental.extraStrategies = Seq( new SparkStrategy { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org