zhouyejoe commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1030959574
##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -407,6 +407,8 @@ private[spark] object JsonProtocol {
g.writeFieldName("Accumulables")
accumulablesToJson(stageInfo.accumulables.values, g)
g.writeNumberField("Resource Profile Id", stageInfo.resourceProfileId)
+ g.writeBooleanField("Push based shuffle enabled",
stageInfo.isPushBasedShuffleEnabled)
+ g.writeNumberField("Shuffle push mergers count",
stageInfo.shuffleMergerCount)
Review Comment:
Nit: Capitalization for the first char of each word, following the same
style as others shown above
##########
project/MimaExcludes.scala:
##########
@@ -87,6 +87,11 @@ object MimaExcludes {
// [SPARK-36511][MINOR][SQL] Remove ColumnIOUtil
ProblemFilters.exclude[MissingClassProblem]("org.apache.parquet.io.ColumnIOUtil"),
+ // [SPARK-36620] [SHUFFLE] Expose push based shuffle metrics
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.StageData.this"),
+
// [SPARK-40324][SQL] Provide query context in AnalysisException
Review Comment:
Just curious: What specific changes lead to this exclusion? Can we avoid
this exclusion?
##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -722,6 +718,63 @@ final class ShuffleBlockFetcherIterator(
}
}
+ // Number of map blocks in a ShuffleBlockChunk
+ private def getShuffleChunkCardinality(blockId: ShuffleBlockChunkId): Int = {
+ val chunkTracker = pushBasedFetchHelper.getRoaringBitMap(blockId)
+ chunkTracker match {
+ case Some(bitmap) => bitmap.getCardinality
+ case None => 0
+ }
+ }
+
+ // Check if the merged block is local to the host or not
+ private def isLocalMergedBlockAddress(address: BlockManagerId): Boolean = {
+ address.executorId.isEmpty && address.host ==
blockManager.blockManagerId.host
+ }
+
+ private def shuffleMetricsUpdate(
+ blockId: BlockId,
+ buf: ManagedBuffer,
+ isLocal: Boolean): Unit = {
+ if (isLocal) {
+ shuffleLocalMetricsUpdate(blockId, buf)
+ } else {
+ shuffleRemoteMetricsUpdate(blockId, buf)
+ }
+ }
+
+ private def shuffleLocalMetricsUpdate(blockId: BlockId, buf: ManagedBuffer):
Unit = {
+ // Check if the block is within the host-local blocks set, or if it is a
merged local
+ // block. In either case, it is local read and we need to increase the
local
+ // shuffle read metrics.
+ blockId match {
+ case chunkId: ShuffleBlockChunkId =>
+ shuffleMetrics.incLocalMergedChunksFetched(1)
+
shuffleMetrics.incLocalMergedBlocksFetched(getShuffleChunkCardinality(chunkId))
+ shuffleMetrics.incLocalMergedBlocksBytesRead(buf.size)
+
shuffleMetrics.incLocalBlocksFetched(getShuffleChunkCardinality(chunkId))
Review Comment:
getShuffleChunkCardinality will only get called once for each blockId in
either shuffleLocalMetricsUpdate or shuffleRemoteMetricsUpdate, I think it
won't get reused.
##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -1105,11 +1132,17 @@ private[spark] object JsonProtocol {
case None => Seq.empty[AccumulableInfo]
}
}
+ val isPushBasedShuffleEnabled =
+ jsonOption(json.get("Push based shuffle
enabled")).map(_.extractBoolean).getOrElse(false)
+ val shufflePushMergersCount =
Review Comment:
Nit: Same here. Capitalization
##########
core/src/test/resources/HistoryServerExpectations/one_stage_json_with_details_expectation.json:
##########
@@ -32,7 +32,19 @@
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
+ "shuffleCorruptMergedBlockChunks" : 0,
+ "shuffleFallbackCount" : 0,
+ "shuffleMergedRemoteBlocksFetched" : 0,
+ "shuffleMergedLocalBlocksFetched" : 0,
+ "shuffleMergedRemoteChunksFetched" : 0,
+ "shuffleMergedLocalChunksFetched" : 0,
+ "shuffleMergedRemoteBytesRead" : 0,
+ "shuffleMergedLocalBytesRead" : 0,
+ "shuffleRemoteReqsDuration" : 0,
+ "shuffleMergedRemoteReqsDuration" : 0,
"shuffleReadRecords" : 0,
+ "isPushBasedShuffleEnabled" : false,
Review Comment:
Nit: Line 45 should be moved to line 35.
##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -1226,6 +1259,29 @@ private[spark] object JsonProtocol {
readMetrics.incRecordsRead(
jsonOption(readJson.get("Total Records
Read")).map(_.extractLong).getOrElse(0L))
metrics.mergeShuffleReadMetrics()
+ readMetrics.incRemoteReqsDuration(jsonOption(readJson.get("Remote
Requests Duration"))
+ .map(_.extractLong).getOrElse(0L))
+ jsonOption(readJson.get("Push Based")).foreach {
pushBasedShuffleReadJson =>
+ readMetrics.incCorruptMergedBlockChunks(jsonOption(
+ pushBasedShuffleReadJson.get("Corrupt Merged Block Chunks"))
+ .map(_.extractLong).getOrElse(0L))
Review Comment:
Nit: Looks like indent is off
##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -726,6 +736,63 @@ final class ShuffleBlockFetcherIterator(
}
}
+ // Number of map blocks in a ShuffleBlockChunk
+ private def getShuffleChunkCardinality(blockId: ShuffleBlockChunkId): Int = {
+ val chunkTracker = pushBasedFetchHelper.getRoaringBitMap(blockId)
+ chunkTracker match {
+ case Some(bitmap) => bitmap.getCardinality
+ case None => 0
+ }
+ }
+
+ // Check if the merged block is local to the host or not
+ private def isLocalMergedBlockAddress(address: BlockManagerId): Boolean = {
+ address.executorId.isEmpty && address.host ==
blockManager.blockManagerId.host
+ }
+
+ private def shuffleMetricsUpdate(
+ blockId: BlockId,
+ buf: ManagedBuffer,
+ local: Boolean): Unit = {
Review Comment:
Nit: Indent is off
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]