[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-22 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator(
   }
 }
 
+@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = 
false): Unit = {
+  if (remainingBlocks.isEmpty) {
+val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime)
+if (wasReqForMergedChunks) {
+  shuffleMetrics.incRemoteMergedReqsDuration(durationMs)
+} else {
+  shuffleMetrics.incRemoteReqsDuration(durationMs)
+}

Review Comment:
   `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating 
merged fetches as a subset of fetches for remote bytes - while here we are 
treating them separately.
   Any particular reason for this difference ? (A throughput computation will 
need to take this into account due to this difference)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-22 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator(
   }
 }
 
+@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = 
false): Unit = {
+  if (remainingBlocks.isEmpty) {
+val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime)
+if (wasReqForMergedChunks) {
+  shuffleMetrics.incRemoteMergedReqsDuration(durationMs)
+} else {
+  shuffleMetrics.incRemoteReqsDuration(durationMs)
+}

Review Comment:
   `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating 
merged fetches as a subset of fetches for remote bytes - while here we are 
treating them separately.
   Any particular reason for this difference ? (A throughput computation 
becomes tricky due to this difference)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-22 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator(
   }
 }
 
+@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = 
false): Unit = {
+  if (remainingBlocks.isEmpty) {
+val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime)
+if (wasReqForMergedChunks) {
+  shuffleMetrics.incRemoteMergedReqsDuration(durationMs)
+} else {
+  shuffleMetrics.incRemoteReqsDuration(durationMs)
+}

Review Comment:
   `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating 
merged fetches as a subset of fetches for remote bytes - while here we are 
treating them separately.
   Any particular reason for this difference ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-22 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator(
   }
 }
 
+@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = 
false): Unit = {
+  if (remainingBlocks.isEmpty) {
+val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime)
+if (wasReqForMergedChunks) {
+  shuffleMetrics.incRemoteMergedReqsDuration(durationMs)
+} else {
+  shuffleMetrics.incRemoteReqsDuration(durationMs)
+}

Review Comment:
   `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating 
merged fetches as a subset of all fetches - while here we are treating them 
seperately.
   Any particular reason for this difference ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-04 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1014392620


##
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##
@@ -1780,7 +1802,19 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |  "Remote Bytes Read" : 0,
   |  "Remote Bytes Read To Disk" : 0,
   |  "Local Bytes Read" : 0,
-  |  "Total Records Read" : 0
+  |  "Total Records Read" : 0,
+  |  "Remote Requests Duration": 0,
+  |  "Push Based": {

Review Comment:
   +CC @zhouyejoe 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-02 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1012357823


##
core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala:
##
@@ -146,6 +268,16 @@ private[spark] class TempShuffleReadMetrics extends 
ShuffleReadMetricsReporter {
   override def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
   override def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
   override def incRecordsRead(v: Long): Unit = _recordsRead += v
+  override def incCorruptMergedBlockChunks(v: Long): Unit = 
_corruptMergedBlockChunks += v
+  override def incFallbackCount(v: Long): Unit = _fallbackCount += v
+  override def incRemoteMergedBlocksFetched(v: Long): Unit = 
_remoteMergedBlocksFetched += v
+  override def incLocalMergedBlocksFetched(v: Long): Unit = 
_localMergedBlocksFetched += v
+  override def incRemoteMergedChunksFetched(v: Long): Unit = 
_remoteMergedChunksFetched += v
+  override def incLocalMergedChunksFetched(v: Long): Unit = 
_localMergedChunksFetched += v
+  override def incRemoteMergedBlocksBytesRead(v: Long): Unit = 
_remoteMergedBlocksBytesRead += v
+  override def incLocalMergedBlocksBytesRead(v: Long): Unit = 
_localMergedBlocksBytesRead += v
+  override def incRemoteReqsDuration(v: Long): Unit = _remoteReqsDuration += v
+  override def incRemoteMergedReqsDuration(v: Long): Unit = 
_remoteMergedReqsDuration += v

Review Comment:
   I called out the same 
[above](https://github.com/apache/spark/pull/36165#discussion_r978229580)
   I am fine with pulling those two metrics out and introducing them in a 
different PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-29 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1008795619


##
core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala:
##
@@ -78,6 +78,16 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
 shuffleLocalBytesRead = 1L,
 shuffleReadBytes = 1L,
 shuffleReadRecords = 1L,
+shuffleCorruptMergedBlockChunks = 2L,
+shuffleFallbackCount = 2L,
+shuffleMergedRemoteBlocksFetched = 1L,
+shuffleMergedLocalBlocksFetched = 1L,
+shuffleMergedRemoteChunksFetched = 1L,
+shuffleMergedLocalChunksFetched = 1L,
+shuffleMergedRemoteBytesRead = 1L,
+shuffleMergedLocalBytesRead = 1L,
+shuffleRemoteReqsDuration = 1L,
+shuffleMergedRemoteReqsDuration = 1L,

Review Comment:
   nit: given inputBytes > 0, no likely :-) But that is not relevant to this 
test !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-29 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1008794924


##
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala:
##
@@ -290,6 +301,7 @@ private class PushBasedFetchHelper(
   address: BlockManagerId): Unit = {
 assert(blockId.isInstanceOf[ShuffleMergedBlockId] || 
blockId.isInstanceOf[ShuffleBlockChunkId])
 logWarning(s"Falling back to fetch the original blocks for push-merged 
block $blockId")
+shuffleMetrics.incFallbackCount(1)

Review Comment:
   I misread the change, makes sense- thanks !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-29 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1008794591


##
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##
@@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper(
 val shuffleLocalBytesRead: Long,
 @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = 
TaskIndexNames.STAGE)
 val shuffleRecordsRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS,
+  parent = TaskIndexNames.STAGE)
+val shuffleCorruptMergedBlockChunks: Long,
+@KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT,
+  parent = TaskIndexNames.STAGE)
+val shuffleFallbackCount: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteBlocksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalBlocksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteChunksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalChunksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteBlocksBytesRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalBlocksBytesRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = 
TaskIndexNames.STAGE)
+val shuffleRemoteReqsDuration: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION,
+  parent = TaskIndexNames.STAGE)

Review Comment:
   Makes sense, thanks for clarifying !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-26 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r978229580


##
core/src/main/scala/org/apache/spark/InternalAccumulator.scala:
##
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
 val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
 val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
 val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX + 
"remoteReqsDuration"
+val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"corruptMergedBlockChunks"
+val FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX + "fallbackCount"
+val REMOTE_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"remoteMergedBlocksFetched"
+val LOCAL_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"localMergedBlocksFetched"
+val REMOTE_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"remoteMergedChunksFetched"
+val LOCAL_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"localMergedChunksFetched"
+val REMOTE_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"remoteMergedBytesRead"
+val LOCAL_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"localMergedBytesRead"
+val REMOTE_MERGED_REQS_DURATION = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"remoteMergedReqsDuration"

Review Comment:
   Can we check if all the metrics are getting populated ?
   A cursory indicated the following are not populated:
   
   * `remoteReqsDuration`
   * `remoteMergedReqsDuration`
   
   Also, make `remoteMergedBytesRead` and `localMergedBytesRead` consistent 
with the name of its corresponding accumulator ?



##
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##
@@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper(
 val shuffleLocalBytesRead: Long,
 @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = 
TaskIndexNames.STAGE)
 val shuffleRecordsRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS,
+  parent = TaskIndexNames.STAGE)
+val shuffleCorruptMergedBlockChunks: Long,
+@KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT,
+  parent = TaskIndexNames.STAGE)
+val shuffleFallbackCount: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteBlocksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalBlocksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteChunksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalChunksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteBlocksBytesRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalBlocksBytesRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = 
TaskIndexNames.STAGE)
+val shuffleRemoteReqsDuration: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION,
+  parent = TaskIndexNames.STAGE)

Review Comment:
   Do we need to index any of these ? None of these are sortable columns in the 
UI right ?



##
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##
@@ -654,6 +654,27 @@ private[spark] class Executor(
 executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
 
executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
 
executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS
+  .inc(task.metrics.shuffleReadMetrics.corruptMergedBlockChunks)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_FALLBACK_COUNT
+  .inc(task.metrics.shuffleReadMetrics.fallbackCount)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksFetched)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.localMergedBlocksFetched)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.remoteMergedChunksFetched)
+