jaceklaskowski commented on a change in pull request #32388:
URL: https://github.com/apache/spark/pull/32388#discussion_r622875712



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -98,10 +99,10 @@ public ExternalBlockHandler(
       OneForOneStreamManager streamManager,
       ExternalShuffleBlockResolver blockManager,
       MergedShuffleFileManager mergeManager) {
-    this.metrics = new ShuffleMetrics();
     this.streamManager = streamManager;
     this.blockManager = blockManager;
     this.mergeManager = mergeManager;
+    this.metrics = new ShuffleMetrics();

Review comment:
       Why has this been moved down? Curious.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -291,9 +294,18 @@ public ShuffleMetrics() {
       allMetrics.put("openBlockRequestLatencyMillis", 
openBlockRequestLatencyMillis);
       allMetrics.put("registerExecutorRequestLatencyMillis", 
registerExecutorRequestLatencyMillis);
       allMetrics.put("finalizeShuffleMergeLatencyMillis", 
finalizeShuffleMergeLatencyMillis);
+      allMetrics.put("blockTransferRate", blockTransferRate);
       allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
+      allMetrics.put("blockTransferAvgSize_1min", new RatioGauge() {
+        @Override
+        protected Ratio getRatio() {
+          return Ratio.of(
+              blockTransferRateBytes.getOneMinuteRate(),
+              blockTransferRate.getOneMinuteRate());
+        }
+      });
       allMetrics.put("registeredExecutorsSize",
-                     (Gauge<Integer>) () -> 
blockManager.getRegisteredExecutorsSize());
+          (Gauge<Integer>) blockManager::getRegisteredExecutorsSize);

Review comment:
       Been a while since I was with Java, and I'm curious whether the change 
is going to work with Java 8 (that I assume Spark 3.x keeps supporting)?

##########
File path: 
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
##########
@@ -403,7 +403,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
     val metrics = 
metricSetRef.get(metricsSource).asInstanceOf[MetricSet].getMetrics
 
     assert(metrics.keySet().asScala == Set(
+      "blockTransferRate",

Review comment:
       Why not to use the same trick "// Use sorted Seq instead of Set for 
easier comparison when there is a mismatch" here?

##########
File path: docs/monitoring.md
##########
@@ -1361,7 +1361,9 @@ Note: applies when running in Spark standalone as worker
 ### Component instance = shuffleService
 Note: applies to the shuffle service
 
+- blockTransferRate (meter)
 - blockTransferRateBytes (meter)
+- blockTransferAvgTime_1min (gauge - 1 minute moving average)

Review comment:
       nit: 1-minute (with the dash)

##########
File path: 
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
##########
@@ -37,28 +38,57 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite 
with Matchers {
   val metrics = new ExternalBlockHandler(streamManager, 
blockResolver).getAllMetrics
 
   test("metrics named as expected") {
-    val allMetrics = Set(
+    val allMetrics = Seq(

Review comment:
       Why `Seq` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to