This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bc37fdc  [SPARK-31275][WEBUI] Improve the metrics format in 
ExecutionPage for StageId
bc37fdc is described below

commit bc37fdc77130ce4f60806db0bb2b1b8914452040
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Fri Mar 27 13:35:28 2020 +0800

    [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId
    
    ### What changes were proposed in this pull request?
    
    In ExecutionPage, metrics format for stageId, attemptId and taskId are 
displayed like `(stageId (attemptId): taskId)` for now.
    I changed this format like `(stageId.attemptId taskId)`.
    
    ### Why are the changes needed?
    
    As cloud-fan suggested  
[here](https://github.com/apache/spark/pull/27927#discussion_r398591519), 
`stageId.attemptId` is more standard in Spark.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. Before applying this change, we can see the UI like as follows.
    
![with-checked](https://user-images.githubusercontent.com/4736016/77682421-42a6c200-6fda-11ea-92e4-e9f4554adb71.png)
    
    And after this change applied, we can like as follows.
    
![fix-merics-format-with-checked](https://user-images.githubusercontent.com/4736016/77682493-61a55400-6fda-11ea-801f-91a67da698fd.png)
    
    ### How was this patch tested?
    
    Modified `SQLMetricsSuite` and manual test.
    
    Closes #28039 from sarutak/improve-metrics-format.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/ui/static/spark-sql-viz.js   |  2 +-
 .../spark/sql/execution/metric/SQLMetrics.scala      | 12 ++++++------
 .../spark/sql/execution/ui/ExecutionPage.scala       |  2 +-
 .../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++++++++++----------
 .../sql/execution/metric/SQLMetricsTestUtils.scala   | 18 +++++++++---------
 5 files changed, 27 insertions(+), 27 deletions(-)

diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
 
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
index 0fb7dab..bb393d9 100644
--- 
a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
+++ 
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
@@ -73,7 +73,7 @@ function setupTooltipForSparkPlanNode(nodeId) {
 
 // labelSeparator should be a non-graphical character in order not to affect 
the width of boxes.
 var labelSeparator = "\x01";
-var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$";
+var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$";
 
 /*
  * Helper function to pre-process the graph layout.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 65aabe0..1394e0f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -116,7 +116,7 @@ object SQLMetrics {
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
     val acc = new SQLMetric(SIZE_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max (stageId 
(attemptId): taskId))"),
+    acc.register(sc, name = Some(s"$name total (min, med, max (stageId: 
taskId))"),
       countFailedValues = false)
     acc
   }
@@ -126,7 +126,7 @@ object SQLMetrics {
     // duration(min, med, max):
     // 5s (800ms, 1s, 2s)
     val acc = new SQLMetric(TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max (stageId 
(attemptId): taskId))"),
+    acc.register(sc, name = Some(s"$name total (min, med, max (stageId: 
taskId))"),
       countFailedValues = false)
     acc
   }
@@ -134,7 +134,7 @@ object SQLMetrics {
   def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
     // Same with createTimingMetric, just normalize the unit of time to 
millisecond.
     val acc = new SQLMetric(NS_TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max (stageId 
(attemptId): taskId))"),
+    acc.register(sc, name = Some(s"$name total (min, med, max (stageId: 
taskId))"),
       countFailedValues = false)
     acc
   }
@@ -150,7 +150,7 @@ object SQLMetrics {
     // probe avg (min, med, max):
     // (1.2, 2.2, 6.3)
     val acc = new SQLMetric(AVERAGE_METRIC)
-    acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): 
taskId))"),
+    acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
       countFailedValues = false)
     acc
   }
@@ -169,11 +169,11 @@ object SQLMetrics {
    * and represent it in string for a SQL physical operator.
     */
   def stringValue(metricsType: String, values: Array[Long], maxMetrics: 
Array[Long]): String = {
-    // stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task 
$taskId))
+    // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task 
$taskId)
     val stringMetric = if (maxMetrics.isEmpty) {
       "(driver)"
     } else {
-      s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task 
${maxMetrics(3)})"
+      s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
     }
     if (metricsType == SUM_METRIC) {
       val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index d304369..76bc7fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -73,7 +73,7 @@ class ExecutionPage(parent: SQLTab) extends 
WebUIPage("execution") with Logging
         </div>
         <div>
           <input type="checkbox" id="stageId-and-taskId-checkbox"></input>
-          <span>Show the Stage (Stage Attempt): Task ID that corresponds to 
the max metric</span>
+          <span>Show the Stage ID and Task ID that corresponds to the max 
metric</span>
         </div>
 
       val metrics = sqlStore.executionMetrics(executionId)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 7d09577..11f93c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val ds = spark.range(10).filter('id < 5)
     testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
       0L -> (("WholeStageCodegen (1)", Map(
-        "duration total (min, med, max (stageId (attemptId): taskId))" -> {
+        "duration total (min, med, max (stageId: taskId))" -> {
           _.toString.matches(timingMetricPattern)
         })))), true)
   }
@@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val df = testData2.groupBy().count() // 2 partitions
     val expected1 = Seq(
       Map("number of output rows" -> 2L,
-        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
           aggregateMetricsPattern),
       Map("number of output rows" -> 1L,
-        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
           aggregateMetricsPattern))
     val shuffleExpected1 = Map(
       "records read" -> 2L,
@@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val df2 = testData2.groupBy('a).count()
     val expected2 = Seq(
       Map("number of output rows" -> 4L,
-        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
           aggregateMetricsPattern),
       Map("number of output rows" -> 3L,
-        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
           aggregateMetricsPattern))
 
     val shuffleExpected2 = Map(
@@ -181,8 +181,8 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
       }
       val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
       nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probe bucket list iters 
(min, med, max (stageId" +
-          " (attemptId): taskId))")
+        val probes =
+          metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max 
(stageId: taskId))")
         // Extract min, med, max from the string and strip off everthing else.
         val index = 
probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
         probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, 
index).split(", ").foreach {
@@ -231,13 +231,13 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val df = Seq(1, 3, 2).toDF("id").sort('id)
     testSparkPlanMetricsWithPredicates(df, 2, Map(
       0L -> (("Sort", Map(
-        "sort time total (min, med, max (stageId (attemptId): taskId))" -> {
+        "sort time total (min, med, max (stageId: taskId))" -> {
           _.toString.matches(timingMetricPattern)
         },
-        "peak memory total (min, med, max (stageId (attemptId): taskId))" -> {
+        "peak memory total (min, med, max (stageId: taskId))" -> {
           _.toString.matches(sizeMetricPattern)
         },
-        "spill size total (min, med, max (stageId (attemptId): taskId))" -> {
+        "spill size total (min, med, max (stageId: taskId))" -> {
           _.toString.matches(sizeMetricPattern)
         })))
     ))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 0c1148f..766e7a9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -41,27 +41,27 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
 
   protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore
 
-  // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 
32.1 MiB (stage 0
-  // (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
+  // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 
32.1 MiB (stage 0.0:
+  // task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
   protected val sizeMetricPattern = {
     val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
-    val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task 
([0-9])+\\)"
+    val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
     s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
   }
 
-  // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms 
(stage 3 (attempt
-  // 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
+  // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms 
(stage 3.0):
+  // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
   protected val timingMetricPattern = {
     val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
-    val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task 
([0-9])+\\)"
+    val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
     s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
   }
 
   // Pattern of size SQLMetric value for Aggregate tests.
-  // e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )"
+  // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
   protected val aggregateMetricsPattern = {
     val iters = "([0-9]+(\\.[0-9]+)?)"
-    val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task 
([0-9])+\\)"
+    val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
     s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
   }
 
@@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
     }
 
     val totalNumBytesMetric = executedNode.metrics.find(
-      _.name == "written output total (min, med, max (stageId (attemptId): 
taskId))").get
+      _.name == "written output total (min, med, max (stageId: taskId))").get
     val totalNumBytes = 
metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
       .split(" ").head.trim.toDouble
     assert(totalNumBytes > 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to