linhongliu-db commented on code in PR #39268:
URL: https://github.com/apache/spark/pull/39268#discussion_r1061140998
##########
core/src/main/scala/org/apache/spark/internal/config/UI.scala:
##########
@@ -229,4 +229,11 @@ private[spark] object UI {
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.createWithDefault("LOCAL")
+
+ val UI_SQL_GROUP_SUB_EXECUTION_ENABLED =
ConfigBuilder("spark.ui.sql.group.sub.execution.enabled")
+ .doc("Whether to group sub executions together in SQL UI when they belong
to the same " +
+ "root execution")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(false)
Review Comment:
done
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala:
##########
@@ -26,40 +26,65 @@ import scala.xml.{Node, NodeSeq}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with
Logging {
private val sqlStore = parent.sqlStore
+ private val groupSubExecutionEnabled =
parent.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED)
override def render(request: HttpServletRequest): Seq[Node] = {
val currentTime = System.currentTimeMillis()
val running = new mutable.ArrayBuffer[SQLExecutionUIData]()
val completed = new mutable.ArrayBuffer[SQLExecutionUIData]()
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
+ val executionIdToSubExecutions =
+ new mutable.HashMap[Long, mutable.ArrayBuffer[SQLExecutionUIData]]()
sqlStore.executionsList().foreach { e =>
- if (e.errorMessage.isDefined) {
- if (e.errorMessage.get.isEmpty) {
- completed += e
+ def processExecution(e: SQLExecutionUIData): Unit = {
+ if (e.errorMessage.isDefined) {
+ if (e.errorMessage.get.isEmpty) {
+ completed += e
+ } else {
+ failed += e
+ }
+ } else if (e.completionTime.isEmpty) {
+ running += e
} else {
- failed += e
+ // When `completionTime` is present, it means the query execution is
completed and
+ // `errorMessage` should be present as well. However, events
generated by old versions of
+ // Spark do not have the `errorMessage` field. We have to check the
status of this query
+ // execution's jobs.
+ val isFailed = e.jobs.exists { case (_, status) => status ==
JobExecutionStatus.FAILED }
+ if (isFailed) {
+ failed += e
+ } else {
+ completed += e
+ }
+ }
+ }
+ // group the sub execution only if the root execution will be displayed
(i.e. not missing)
+ if (groupSubExecutionEnabled &&
+ e.executionId != e.rootExecutionId &&
+ executionIdToSubExecutions.contains(e.rootExecutionId)) {
+ executionIdToSubExecutions.get(e.rootExecutionId).foreach { executions
=>
Review Comment:
done
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala:
##########
@@ -140,6 +166,7 @@ object SQLExecution {
} finally {
executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
+ unsetRootExecutionId(sc, oldExecutionId)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]