wypoon commented on a change in pull request #31451:
URL: https://github.com/apache/spark/pull/31451#discussion_r604318143



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
##########
@@ -87,9 +91,21 @@ object SQLMetrics {
   private val TIMING_METRIC = "timing"
   private val NS_TIMING_METRIC = "nsTiming"
   private val AVERAGE_METRIC = "average"
+  private val V2_CUSTOM = "v2Custom"

Review comment:
       How about just `CUSTOM = "custom"`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
##########
@@ -33,7 +34,10 @@ import org.apache.spark.util.{AccumulatorContext, 
AccumulatorV2, Utils}
  * the executor side are automatically propagated and shown in the SQL UI 
through metrics. Updates
  * on the driver side must be explicitly posted using 
[[SQLMetrics.postDriverMetricUpdates()]].
  */
-class SQLMetric(val metricType: String, initValue: Long = 0L) extends 
AccumulatorV2[Long, Long] {
+class SQLMetric(
+    val metricType: String,
+    initValue: Long = 0L,
+    val aggregateMethod: (Array[Long], Array[Long]) => String) extends 
AccumulatorV2[Long, Long] {

Review comment:
       You could add a default for aggregateMethod by adding another 
constructor:
   ```
     def this(metricType: String, initValue: Long = 0L) =
       this(metricType, initValue, SQLMetrics.stringValue(metricType, _, _))
   ```
   I think you may need to remove the default for initValue in the primary 
constructor first.
   With this, you don't need to modify the existing `create*Metric` methods in 
`SQLMetrics`. You don't even need to define `defaultAggregateMethod`, since we 
can inline it.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
##########
@@ -41,31 +42,32 @@ object SqlResourceSuite {
 
   val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L))
 
+  val defaultAggregateMethod = SQLMetrics.defaultAggregateMethod("")
   val filterNode = new SparkPlanGraphNode(1, FILTER, "",
-    metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "")))
+    metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "", 
defaultAggregateMethod)))

Review comment:
       Changes in this class won't be needed if you add the default for the 
aggregateMethod.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
##########
@@ -146,4 +146,5 @@ class SparkPlanGraphNodeWrapper(
 case class SQLPlanMetric(
     name: String,
     accumulatorId: Long,
-    metricType: String)
+    metricType: String,
+    aggregateMethod: (Array[Long], Array[Long]) => String)

Review comment:
       You could add a default for aggregateMethod by creating a companion 
object for the case class:
   ```
   object SQLPlanMetric {
     def apply(name: String, accumulatorId: Long, metricType: String): 
SQLPlanMetric =
       this(name, accumulatorId, metricType, SQLMetrics.stringValue(metricType, 
_, _))
   }
   ```
   Then you don't need to modify existing tests that do `SQLPlanMetric(name, 
accumulatorId, metricType)`.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
##########
@@ -60,7 +60,8 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
     val store = new SQLAppStatusStore(kvstore, Some(listener))
 
     val metrics = (0 until numMetrics).map { i =>
-      new SQLMetricInfo(s"metric$i", i.toLong, "average")
+      new SQLMetricInfo(s"metric$i", i.toLong, "average",
+        SQLMetrics.defaultAggregateMethod("average"))

Review comment:
       This change won't be needed if you add the default for the 
aggregateMethod.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
##########
@@ -27,4 +27,5 @@ import org.apache.spark.annotation.DeveloperApi
 class SQLMetricInfo(
     val name: String,
     val accumulatorId: Long,
-    val metricType: String)
+    val metricType: String,
+    val aggregateMethod: (Array[Long], Array[Long]) => String)

Review comment:
       You could add a default for aggregateMethod by adding another 
constructor:
   ```
     def this(name: String, accumulatorId: Long, metricType: String) =
       this(name, accumulatorId, metricType, SQLMetrics.stringValue(metricType, 
_, _))
   ```
   Then existing tests that do `new SQLMetricInfo(name, accumuldatorId, 
metricType)` do not need to be changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to