otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r860171613
##########
core/src/main/scala/org/apache/spark/InternalAccumulator.scala:
##########
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+ val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX +
"remoteReqsDuration"
+ val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX +
"corruptMergedBlockChunks"
+ val ORIGINAL_BLOCK_FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX +
"fallbackCount"
Review Comment:
Nit: `ORIGINAL_BLOCK_FALLBACK_COUNT` -> `FALLBACK_COUNT`. This keeps track
of how many times fallback is triggered. The prefix `ORIGINAL_BLOCK` seems odd
##########
core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala:
##########
@@ -84,13 +94,76 @@ class ShuffleReadMetrics private[spark] () extends
Serializable {
*/
def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
+ /**
+ * Number of corrupt merged shuffle block chunks encountered by this task
(remote or local).
+ */
+ def corruptMergedBlockChunks: Long = _corruptMergedBlockChunks.sum
+
+ /**
+ * Number of times the task had to fallback to fetch original shuffle blocks
for a merged
+ * shuffle block chunk (remote or local).
+ */
+ def fallbackCount: Long = _fallbackCount.sum
+
+ /**
+ * Number of remote merged blocks fetched.
+ */
+ def remoteMergedBlocksFetched: Long = _remoteMergedBlocksFetched.sum
+
+ /**
+ * Number of local merged blocks fetched.
+ */
+ def localMergedBlocksFetched: Long = _localMergedBlocksFetched.sum
+
+ /**
+ * Number of remote merged chunks fetched.
+ */
+ def remoteMergedChunksFetched: Long = _remoteMergedChunksFetched.sum
+
+ /**
+ * Number of local merged chunks fetched.
+ */
+ def localMergedChunksFetched: Long = _localMergedChunksFetched.sum
+
+ /**
+ * Total number of remote merged bytes read.
+ */
+ def remoteMergedBlocksBytesRead: Long = _remoteMergedBlocksBytesRead.sum
+
+ /**
+ * Total number of local merged bytes read.
+ */
+ def localMergedBlocksBytesRead: Long = _localMergedBlocksBytesRead.sum
+
+ /**
+ * Total time taken for remote requests.
+ */
+ def remoteReqsDuration: Long = _remoteReqsDuration.sum +
_remoteMergedReqsDuration.sum
+
Review Comment:
In the PR for the jira SPARK-32922, it was decided to not add push-merge
related metrics to original/unmerged metrics. For example
totalRemoteShuffleBlocksFetched in master doesn't include
remoteMergedShuffleBlocks. I prefer consistency so even for this metric, my
recommendation is to not add `remoteMergedReqsDuration` to `remoteReqsDuration`
##########
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##########
@@ -627,9 +709,22 @@ private[spark] class AppStatusStore(
shuffleLocalBytesRead = stage.shuffleLocalBytesRead,
shuffleReadBytes = stage.shuffleReadBytes,
shuffleReadRecords = stage.shuffleReadRecords,
+ pushBasedShuffleCorruptMergedBlockChunks =
stage.pushBasedShuffleCorruptMergedBlockChunks,
+ pushBasedShuffleFallbackCount = stage.pushBasedShuffleFallbackCount,
+ pushBasedShuffleMergedRemoteBlocksFetched =
stage.pushBasedShuffleMergedRemoteBlocksFetched,
+ pushBasedShuffleMergedLocalBlocksFetched =
stage.pushBasedShuffleMergedLocalBlocksFetched,
+ pushBasedShuffleMergedRemoteChunksFetched =
stage.pushBasedShuffleMergedRemoteChunksFetched,
+ pushBasedShuffleMergedLocalChunksFetched =
stage.pushBasedShuffleMergedLocalChunksFetched,
+ pushBasedShuffleMergedRemoteBytesRead =
stage.pushBasedShuffleMergedRemoteBytesRead,
+ pushBasedShuffleMergedLocalBytesRead =
stage.pushBasedShuffleMergedLocalBytesRead,
+ pushBasedShuffleRemoteReqsDuration =
stage.pushBasedShuffleRemoteReqsDuration,
Review Comment:
This looks off. This shouldn't be prefixed by `pushBasedShuffle`
##########
core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala:
##########
@@ -40,8 +40,8 @@ class StageInfo(
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] =
Seq.empty,
private[spark] val shuffleDepId: Option[Int] = None,
val resourceProfileId: Int,
- private[spark] var isPushBasedShuffleEnabled: Boolean = false,
- private[spark] var shuffleMergerCount: Int = 0) {
+ var isPushBasedShuffleEnabled: Boolean = false,
+ var shuffleMergerCount: Int = 0) {
Review Comment:
Is this change needed for a test?
##########
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##########
@@ -448,10 +504,36 @@ private[spark] class AppStatusStore(
shuffleRemoteBytesReadToDisk =
computedQuantiles.shuffleReadMetrics.remoteBytesReadToDisk(idx),
shuffleTotalBlocksFetched =
computedQuantiles.shuffleReadMetrics.totalBlocksFetched(idx),
+ pushBasedShuffleCorruptMergedBlockChunks =
+
computedQuantiles.shuffleReadMetrics.pushBased.corruptMergedBlockChunks(idx),
+ pushBasedShuffleFallbackCount =
+ computedQuantiles.shuffleReadMetrics.pushBased.fallbackCount(idx),
+ pushBasedShuffleMergedRemoteBlocksFetched =
+
computedQuantiles.shuffleReadMetrics.pushBased.remoteMergedBlocksFetched(idx),
+ pushBasedShuffleMergedLocalBlocksFetched =
+
computedQuantiles.shuffleReadMetrics.pushBased.localMergedBlocksFetched(idx),
+ pushBasedShuffleMergedRemoteChunksFetched =
+
computedQuantiles.shuffleReadMetrics.pushBased.remoteMergedChunksFetched(idx),
+ pushBasedShuffleMergedLocalChunksFetched =
+
computedQuantiles.shuffleReadMetrics.pushBased.localMergedChunksFetched(idx),
+ pushBasedShuffleMergedRemoteBlocksBytesRead =
+
computedQuantiles.shuffleReadMetrics.pushBased.remoteMergedBlocksBytesRead(idx),
+ pushBasedShuffleMergedLocalBlocksBytesRead =
Review Comment:
Nit: Please check the indentation
##########
core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:
##########
@@ -227,9 +227,23 @@ class TaskMetrics private[spark] () extends Serializable {
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
+ shuffleRead.CORRUPT_MERGED_BLOCK_CHUNKS ->
+ shuffleReadMetrics._corruptMergedBlockChunks,
+ shuffleRead.ORIGINAL_BLOCK_FALLBACK_COUNT ->
shuffleReadMetrics._fallbackCount,
+ shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED ->
shuffleReadMetrics._remoteMergedBlocksFetched,
+ shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED ->
shuffleReadMetrics._localMergedBlocksFetched,
+ shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED ->
shuffleReadMetrics._remoteMergedChunksFetched,
+ shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED ->
shuffleReadMetrics._localMergedChunksFetched,
+ shuffleRead.REMOTE_MERGED_BLOCKS_READ ->
shuffleReadMetrics._remoteMergedBlocksBytesRead,
+ shuffleRead.LOCAL_MERGED_BLOCKS_READ ->
shuffleReadMetrics._localMergedBlocksBytesRead,
+ shuffleRead.REMOTE_REQS_DURATION -> shuffleReadMetrics._remoteReqsDuration,
Review Comment:
Nit: indentation
##########
core/src/main/scala/org/apache/spark/status/LiveEntity.scala:
##########
@@ -502,9 +527,30 @@ private class LiveStage(var info: StageInfo) extends
LiveEntity {
shuffleReadBytes =
metrics.shuffleReadMetrics.localBytesRead +
metrics.shuffleReadMetrics.remoteBytesRead,
shuffleReadRecords = metrics.shuffleReadMetrics.recordsRead,
+ pushBasedShuffleCorruptMergedBlockChunks =
+ metrics.shuffleReadMetrics.pushBased.corruptMergedBlockChunks,
+ pushBasedShuffleFallbackCount =
metrics.shuffleReadMetrics.pushBased.fallbackCount,
+ pushBasedShuffleMergedRemoteBlocksFetched =
+ metrics.shuffleReadMetrics.pushBased.remoteMergedBlocksFetched,
+ pushBasedShuffleMergedLocalBlocksFetched =
+ metrics.shuffleReadMetrics.pushBased.localMergedBlocksFetched,
+ pushBasedShuffleMergedRemoteChunksFetched =
+ metrics.shuffleReadMetrics.pushBased.remoteMergedChunksFetched,
+ pushBasedShuffleMergedLocalChunksFetched =
+ metrics.shuffleReadMetrics.pushBased.localMergedChunksFetched,
+ pushBasedShuffleMergedRemoteBytesRead =
+ metrics.shuffleReadMetrics.pushBased.remoteMergedBlocksBytesRead,
+ pushBasedShuffleMergedLocalBytesRead =
+ metrics.shuffleReadMetrics.pushBased.localMergedBlocksBytesRead,
+ pushBasedShuffleRemoteReqsDuration =
metrics.shuffleReadMetrics.remoteReqsDuration,
Review Comment:
Same here. This is not specific to pushBased shuffle
##########
core/src/main/scala/org/apache/spark/status/LiveEntity.scala:
##########
@@ -843,9 +936,35 @@ private[spark] object LiveEntityHelpers {
m2.shuffleReadMetrics.remoteBytesReadToDisk * mult,
m1.shuffleReadMetrics.localBytesRead +
m2.shuffleReadMetrics.localBytesRead * mult,
m1.shuffleReadMetrics.recordsRead + m2.shuffleReadMetrics.recordsRead *
mult,
+ m1.shuffleReadMetrics.pushBased.corruptMergedBlockChunks +
+ m2.shuffleReadMetrics.pushBased.corruptMergedBlockChunks * mult,
+ m1.shuffleReadMetrics.pushBased.fallbackCount +
+ m2.shuffleReadMetrics.pushBased.fallbackCount * mult,
+ m1.shuffleReadMetrics.pushBased.remoteMergedBlocksFetched +
+ m2.shuffleReadMetrics.pushBased.remoteMergedBlocksFetched * mult,
+ m1.shuffleReadMetrics.pushBased.localMergedBlocksFetched +
+ m2.shuffleReadMetrics.pushBased.localMergedBlocksFetched * mult,
+ m1.shuffleReadMetrics.pushBased.remoteMergedChunksFetched +
+ m2.shuffleReadMetrics.pushBased.remoteMergedChunksFetched * mult,
+ m1.shuffleReadMetrics.pushBased.localMergedChunksFetched +
+ m2.shuffleReadMetrics.pushBased.localMergedChunksFetched * mult,
+ m1.shuffleReadMetrics.pushBased.remoteMergedBlocksBytesRead +
+ m2.shuffleReadMetrics.pushBased.remoteMergedBlocksBytesRead * mult,
+ m1.shuffleReadMetrics.pushBased.localMergedBlocksBytesRead +
+ m2.shuffleReadMetrics.pushBased.localMergedBlocksBytesRead * mult,
+ m1.shuffleReadMetrics.remoteReqsDuration +
m2.shuffleReadMetrics.remoteReqsDuration * mult,
+ m1.shuffleReadMetrics.pushBased.remoteMergedReqsDuration +
+ m2.shuffleReadMetrics.pushBased.remoteMergedChunksFetched * mult,
Review Comment:
This looks off.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]