[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of fetches for remote bytes - while here we are treating them separately. Any particular reason for this difference ? (A throughput computation will need to take this into account due to this difference) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of fetches for remote bytes - while here we are treating them separately. Any particular reason for this difference ? (A throughput computation becomes tricky due to this difference) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of fetches for remote bytes - while here we are treating them separately. Any particular reason for this difference ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of all fetches - while here we are treating them seperately. Any particular reason for this difference ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1014392620 ## core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala: ## @@ -1780,7 +1802,19 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Remote Bytes Read" : 0, | "Remote Bytes Read To Disk" : 0, | "Local Bytes Read" : 0, - | "Total Records Read" : 0 + | "Total Records Read" : 0, + | "Remote Requests Duration": 0, + | "Push Based": { Review Comment: +CC @zhouyejoe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1012357823 ## core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala: ## @@ -146,6 +268,16 @@ private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter { override def incLocalBytesRead(v: Long): Unit = _localBytesRead += v override def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v override def incRecordsRead(v: Long): Unit = _recordsRead += v + override def incCorruptMergedBlockChunks(v: Long): Unit = _corruptMergedBlockChunks += v + override def incFallbackCount(v: Long): Unit = _fallbackCount += v + override def incRemoteMergedBlocksFetched(v: Long): Unit = _remoteMergedBlocksFetched += v + override def incLocalMergedBlocksFetched(v: Long): Unit = _localMergedBlocksFetched += v + override def incRemoteMergedChunksFetched(v: Long): Unit = _remoteMergedChunksFetched += v + override def incLocalMergedChunksFetched(v: Long): Unit = _localMergedChunksFetched += v + override def incRemoteMergedBlocksBytesRead(v: Long): Unit = _remoteMergedBlocksBytesRead += v + override def incLocalMergedBlocksBytesRead(v: Long): Unit = _localMergedBlocksBytesRead += v + override def incRemoteReqsDuration(v: Long): Unit = _remoteReqsDuration += v + override def incRemoteMergedReqsDuration(v: Long): Unit = _remoteMergedReqsDuration += v Review Comment: I called out the same [above](https://github.com/apache/spark/pull/36165#discussion_r978229580) I am fine with pulling those two metrics out and introducing them in a different PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1008795619 ## core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala: ## @@ -78,6 +78,16 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { shuffleLocalBytesRead = 1L, shuffleReadBytes = 1L, shuffleReadRecords = 1L, +shuffleCorruptMergedBlockChunks = 2L, +shuffleFallbackCount = 2L, +shuffleMergedRemoteBlocksFetched = 1L, +shuffleMergedLocalBlocksFetched = 1L, +shuffleMergedRemoteChunksFetched = 1L, +shuffleMergedLocalChunksFetched = 1L, +shuffleMergedRemoteBytesRead = 1L, +shuffleMergedLocalBytesRead = 1L, +shuffleRemoteReqsDuration = 1L, +shuffleMergedRemoteReqsDuration = 1L, Review Comment: nit: given inputBytes > 0, no likely :-) But that is not relevant to this test ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1008794924 ## core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala: ## @@ -290,6 +301,7 @@ private class PushBasedFetchHelper( address: BlockManagerId): Unit = { assert(blockId.isInstanceOf[ShuffleMergedBlockId] || blockId.isInstanceOf[ShuffleBlockChunkId]) logWarning(s"Falling back to fetch the original blocks for push-merged block $blockId") +shuffleMetrics.incFallbackCount(1) Review Comment: I misread the change, makes sense- thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1008794591 ## core/src/main/scala/org/apache/spark/status/storeTypes.scala: ## @@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper( val shuffleLocalBytesRead: Long, @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = TaskIndexNames.STAGE) val shuffleRecordsRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS, + parent = TaskIndexNames.STAGE) +val shuffleCorruptMergedBlockChunks: Long, +@KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT, + parent = TaskIndexNames.STAGE) +val shuffleFallbackCount: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteBlocksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalBlocksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteChunksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalChunksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteBlocksBytesRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalBlocksBytesRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = TaskIndexNames.STAGE) +val shuffleRemoteReqsDuration: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION, + parent = TaskIndexNames.STAGE) Review Comment: Makes sense, thanks for clarifying ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r978229580 ## core/src/main/scala/org/apache/spark/InternalAccumulator.scala: ## @@ -54,13 +56,26 @@ private[spark] object InternalAccumulator { val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead" val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime" val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead" +val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX + "remoteReqsDuration" +val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX + "corruptMergedBlockChunks" +val FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX + "fallbackCount" +val REMOTE_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedBlocksFetched" +val LOCAL_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedBlocksFetched" +val REMOTE_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedChunksFetched" +val LOCAL_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedChunksFetched" +val REMOTE_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedBytesRead" +val LOCAL_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedBytesRead" +val REMOTE_MERGED_REQS_DURATION = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedReqsDuration" Review Comment: Can we check if all the metrics are getting populated ? A cursory indicated the following are not populated: * `remoteReqsDuration` * `remoteMergedReqsDuration` Also, make `remoteMergedBytesRead` and `localMergedBytesRead` consistent with the name of its corresponding accumulator ? ## core/src/main/scala/org/apache/spark/status/storeTypes.scala: ## @@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper( val shuffleLocalBytesRead: Long, @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = TaskIndexNames.STAGE) val shuffleRecordsRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS, + parent = TaskIndexNames.STAGE) +val shuffleCorruptMergedBlockChunks: Long, +@KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT, + parent = TaskIndexNames.STAGE) +val shuffleFallbackCount: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteBlocksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalBlocksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteChunksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalChunksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteBlocksBytesRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalBlocksBytesRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = TaskIndexNames.STAGE) +val shuffleRemoteReqsDuration: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION, + parent = TaskIndexNames.STAGE) Review Comment: Do we need to index any of these ? None of these are sortable columns in the UI right ? ## core/src/main/scala/org/apache/spark/executor/Executor.scala: ## @@ -654,6 +654,27 @@ private[spark] class Executor( executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize) executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled) executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled) +executorSource.METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS + .inc(task.metrics.shuffleReadMetrics.corruptMergedBlockChunks) +executorSource.METRIC_PUSH_BASED_SHUFFLE_FALLBACK_COUNT + .inc(task.metrics.shuffleReadMetrics.fallbackCount) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksFetched) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.localMergedBlocksFetched) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.remoteMergedChunksFetched) +e