This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4d213ff3dea [SPARK-45092][SQL][UI] Avoid analyzing twice for failed queries 4d213ff3dea is described below commit 4d213ff3dea4d66e5dec7be3b35c5441d9187c30 Author: Kent Yao <y...@apache.org> AuthorDate: Tue Sep 12 16:35:39 2023 +0800 [SPARK-45092][SQL][UI] Avoid analyzing twice for failed queries ### What changes were proposed in this pull request? As a discussion starting from https://github.com/apache/spark/pull/42481#discussion_r1316776270, for failed queries, we need to avoid calling SparkPlanInfo fromSparkPlan, which triggers another round of analyzing. This patch uses `Either[Throwable, () => T]` to pass the throwable conditionally and bypass plan explain functions on error. ### Why are the changes needed? improvements of https://github.com/apache/spark/pull/42481 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #42838 from yaooqinn/SPARK-45092. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/execution/QueryExecution.scala | 2 +- .../apache/spark/sql/execution/SQLExecution.scala | 72 ++++++++++++++-------- .../spark/sql/execution/ui/UISeleniumSuite.scala | 2 +- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8ddfde8acf8..b3c97a83970 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -71,7 +71,7 @@ class QueryExecution( // Because we do eager analysis for Dataframe, there will be no execution created after // AnalysisException occurs. So we need to explicitly create a new execution to post // start/end events to notify the listener and UI components. - SQLExecution.withNewExecutionId(this, Some("analyze"))(throw e) + SQLExecution.withNewExecutionIdOnError(this, Some("analyze"))(e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 2a44a016d2d..b96b9c25dda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -66,9 +66,10 @@ object SQLExecution extends Logging { * Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that * we can connect them with an execution. */ - def withNewExecutionId[T]( + private def withNewExecutionId0[T]( queryExecution: QueryExecution, - name: Option[String] = None)(body: => T): T = queryExecution.sparkSession.withActive { + name: Option[String] = None)( + body: Either[Throwable, () => T]): T = queryExecution.sparkSession.withActive { val sparkSession = queryExecution.sparkSession val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) @@ -103,9 +104,6 @@ object SQLExecution extends Logging { redactedStr.substring(0, Math.min(truncateLength, redactedStr.length)) }.getOrElse(callSite.shortForm) - val planDescriptionMode = - ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode) - val globalConfigs = sparkSession.sharedState.conf.getAll.toMap val modifiedConfigs = sparkSession.sessionState.conf.getAllConfs .filterNot { case (key, value) => @@ -118,28 +116,39 @@ object SQLExecution extends Logging { withSQLConfPropagated(sparkSession) { var ex: Option[Throwable] = None val startTime = System.nanoTime() + val startEvent = SparkListenerSQLExecutionStart( + executionId = executionId, + rootExecutionId = Some(rootExecutionId), + description = desc, + details = callSite.longForm, + physicalPlanDescription = "", + sparkPlanInfo = SparkPlanInfo.EMPTY, + time = System.currentTimeMillis(), + modifiedConfigs = redactedConfigs, + jobTags = sc.getJobTags() + ) try { - val planInfo = try { - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan) - } catch { - case NonFatal(e) => - logDebug("Failed to generate SparkPlanInfo", e) - // If the queryExecution already failed before this, we are not able to generate the - // the plan info, so we use and empty graphviz node to make the UI happy - SparkPlanInfo.EMPTY + body match { + case Left(e) => + sc.listenerBus.post(startEvent) + throw e + case Right(f) => + val planDescriptionMode = + ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode) + val planDesc = queryExecution.explainString(planDescriptionMode) + val planInfo = try { + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan) + } catch { + case NonFatal(e) => + logDebug("Failed to generate SparkPlanInfo", e) + // If the queryExecution already failed before this, we are not able to generate + // the the plan info, so we use and empty graphviz node to make the UI happy + SparkPlanInfo.EMPTY + } + sc.listenerBus.post( + startEvent.copy(physicalPlanDescription = planDesc, sparkPlanInfo = planInfo)) + f() } - sc.listenerBus.post(SparkListenerSQLExecutionStart( - executionId = executionId, - rootExecutionId = Some(rootExecutionId), - description = desc, - details = callSite.longForm, - physicalPlanDescription = queryExecution.explainString(planDescriptionMode), - sparkPlanInfo = planInfo, - time = System.currentTimeMillis(), - modifiedConfigs = redactedConfigs, - jobTags = sc.getJobTags() - )) - body } catch { case e: Throwable => ex = Some(e) @@ -181,6 +190,19 @@ object SQLExecution extends Logging { } } + def withNewExecutionId[T]( + queryExecution: QueryExecution, + name: Option[String] = None)(body: => T): T = { + withNewExecutionId0(queryExecution, name)(Right(() => body)) + } + + def withNewExecutionIdOnError( + queryExecution: QueryExecution, + name: Option[String] = None)(t: Throwable): Unit = { + withNewExecutionId0(queryExecution, name)(Left(t)) + } + + /** * Wrap an action with a known executionId. When running a different action in a different * thread from the original one, this method can be used to connect the Spark jobs in this action diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala index f25c150259f..30124a5988e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala @@ -110,7 +110,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser { val planDot = findAll(cssSelector(""".dot-file""")).map(_.text).toList assert(planDot.head.startsWith("digraph G {")) val planDetails = findAll(cssSelector("""#physical-plan-details""")).map(_.text).toList - assert(planDetails.head.contains("TABLE_OR_VIEW_NOT_FOUND")) + assert(planDetails.head.isEmpty) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org