zhouyejoe commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1030959574


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -407,6 +407,8 @@ private[spark] object JsonProtocol {
     g.writeFieldName("Accumulables")
     accumulablesToJson(stageInfo.accumulables.values, g)
     g.writeNumberField("Resource Profile Id", stageInfo.resourceProfileId)
+    g.writeBooleanField("Push based shuffle enabled", 
stageInfo.isPushBasedShuffleEnabled)
+    g.writeNumberField("Shuffle push mergers count", 
stageInfo.shuffleMergerCount)

Review Comment:
   Nit: Capitalization for the first char of each word, following the same 
style as others shown above



##########
project/MimaExcludes.scala:
##########
@@ -87,6 +87,11 @@ object MimaExcludes {
     // [SPARK-36511][MINOR][SQL] Remove ColumnIOUtil
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.parquet.io.ColumnIOUtil"),
 
+    // [SPARK-36620] [SHUFFLE] Expose push based shuffle metrics
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
+    
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.StageData.this"),
+
     // [SPARK-40324][SQL] Provide query context in AnalysisException

Review Comment:
   Just curious: What specific changes lead to this exclusion? Can we avoid 
this exclusion?



##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -722,6 +718,63 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  // Number of map blocks in a ShuffleBlockChunk
+  private def getShuffleChunkCardinality(blockId: ShuffleBlockChunkId): Int = {
+    val chunkTracker = pushBasedFetchHelper.getRoaringBitMap(blockId)
+    chunkTracker match {
+      case Some(bitmap) => bitmap.getCardinality
+      case None => 0
+    }
+  }
+
+  // Check if the merged block is local to the host or not
+  private def isLocalMergedBlockAddress(address: BlockManagerId): Boolean = {
+    address.executorId.isEmpty && address.host == 
blockManager.blockManagerId.host
+  }
+
+  private def shuffleMetricsUpdate(
+    blockId: BlockId,
+    buf: ManagedBuffer,
+    isLocal: Boolean): Unit = {
+    if (isLocal) {
+      shuffleLocalMetricsUpdate(blockId, buf)
+    } else {
+      shuffleRemoteMetricsUpdate(blockId, buf)
+    }
+  }
+
+  private def shuffleLocalMetricsUpdate(blockId: BlockId, buf: ManagedBuffer): 
Unit = {
+    // Check if the block is within the host-local blocks set, or if it is a 
merged local
+    // block. In either case, it is local read and we need to increase the 
local
+    // shuffle read metrics.
+    blockId match {
+      case chunkId: ShuffleBlockChunkId =>
+        shuffleMetrics.incLocalMergedChunksFetched(1)
+        
shuffleMetrics.incLocalMergedBlocksFetched(getShuffleChunkCardinality(chunkId))
+        shuffleMetrics.incLocalMergedBlocksBytesRead(buf.size)
+        
shuffleMetrics.incLocalBlocksFetched(getShuffleChunkCardinality(chunkId))

Review Comment:
   getShuffleChunkCardinality will only get called once for each blockId in 
either shuffleLocalMetricsUpdate or shuffleRemoteMetricsUpdate, I think it 
won't get reused.



##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -1105,11 +1132,17 @@ private[spark] object JsonProtocol {
         case None => Seq.empty[AccumulableInfo]
       }
     }
+    val isPushBasedShuffleEnabled =
+      jsonOption(json.get("Push based shuffle 
enabled")).map(_.extractBoolean).getOrElse(false)
+    val shufflePushMergersCount =

Review Comment:
   Nit: Same here. Capitalization



##########
core/src/test/resources/HistoryServerExpectations/one_stage_json_with_details_expectation.json:
##########
@@ -32,7 +32,19 @@
   "shuffleRemoteBytesReadToDisk" : 0,
   "shuffleLocalBytesRead" : 0,
   "shuffleReadBytes" : 0,
+  "shuffleCorruptMergedBlockChunks" : 0,
+  "shuffleFallbackCount" : 0,
+  "shuffleMergedRemoteBlocksFetched" : 0,
+  "shuffleMergedLocalBlocksFetched" : 0,
+  "shuffleMergedRemoteChunksFetched" : 0,
+  "shuffleMergedLocalChunksFetched" : 0,
+  "shuffleMergedRemoteBytesRead" : 0,
+  "shuffleMergedLocalBytesRead" : 0,
+  "shuffleRemoteReqsDuration" : 0,
+  "shuffleMergedRemoteReqsDuration" : 0,
   "shuffleReadRecords" : 0,
+  "isPushBasedShuffleEnabled" : false,

Review Comment:
   Nit: Line 45 should be moved to line 35.



##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -1226,6 +1259,29 @@ private[spark] object JsonProtocol {
       readMetrics.incRecordsRead(
         jsonOption(readJson.get("Total Records 
Read")).map(_.extractLong).getOrElse(0L))
       metrics.mergeShuffleReadMetrics()
+      readMetrics.incRemoteReqsDuration(jsonOption(readJson.get("Remote 
Requests Duration"))
+        .map(_.extractLong).getOrElse(0L))
+      jsonOption(readJson.get("Push Based")).foreach { 
pushBasedShuffleReadJson =>
+        readMetrics.incCorruptMergedBlockChunks(jsonOption(
+          pushBasedShuffleReadJson.get("Corrupt Merged Block Chunks"))
+          .map(_.extractLong).getOrElse(0L))

Review Comment:
   Nit: Looks like indent is off



##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -726,6 +736,63 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  // Number of map blocks in a ShuffleBlockChunk
+  private def getShuffleChunkCardinality(blockId: ShuffleBlockChunkId): Int = {
+    val chunkTracker = pushBasedFetchHelper.getRoaringBitMap(blockId)
+    chunkTracker match {
+      case Some(bitmap) => bitmap.getCardinality
+      case None => 0
+    }
+  }
+
+  // Check if the merged block is local to the host or not
+  private def isLocalMergedBlockAddress(address: BlockManagerId): Boolean = {
+    address.executorId.isEmpty && address.host == 
blockManager.blockManagerId.host
+  }
+
+  private def shuffleMetricsUpdate(
+    blockId: BlockId,
+    buf: ManagedBuffer,
+    local: Boolean): Unit = {

Review Comment:
   Nit: Indent is off



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to