mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1038913313
##########
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##########
@@ -319,7 +319,18 @@ private[spark] class AppStatusStore(
toValues(_.shuffleFetchWaitTime),
toValues(_.shuffleRemoteBytesRead),
toValues(_.shuffleRemoteBytesReadToDisk),
- toValues(_.shuffleTotalBlocksFetched)),
+ toValues(_.shuffleTotalBlocksFetched),
+ toValues(_.shuffleRemoteReqsDuration),
+ new v1.ShufflePushReadMetricDistributions(
+ toValues(_.shuffleCorruptMergedBlockChunks),
+ toValues(_.shuffleFallbackCount),
Review Comment:
`shuffleFallbackCount` and
`ShufflePushReadMetricDistributions.fallbackCount` needs to be renamed as well
to be consistent.
Please do make sure the other renames we did are consistent throughout as
well - I will go over it once more later, but would be great if you can take a
pass as well @thejdeep . Thx
##########
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##########
@@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper(
val shuffleLocalBytesRead: Long,
@KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent =
TaskIndexNames.STAGE)
val shuffleRecordsRead: Long,
+ @KVIndexParam(
+ value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS,
+ parent = TaskIndexNames.STAGE)
+ val shuffleCorruptMergedBlockChunks: Long,
+ @KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT,
+ parent = TaskIndexNames.STAGE)
+ val shuffleFallbackCount: Long,
+ @KVIndexParam(
+ value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent =
TaskIndexNames.STAGE)
+ val shuffleMergedRemoteBlocksFetched: Long,
+ @KVIndexParam(
+ value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent =
TaskIndexNames.STAGE)
+ val shuffleMergedLocalBlocksFetched: Long,
+ @KVIndexParam(
+ value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent =
TaskIndexNames.STAGE)
+ val shuffleMergedRemoteChunksFetched: Long,
+ @KVIndexParam(
+ value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent =
TaskIndexNames.STAGE)
+ val shuffleMergedLocalChunksFetched: Long,
+ @KVIndexParam(
+ value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent =
TaskIndexNames.STAGE)
+ val shuffleMergedRemoteBlocksBytesRead: Long,
Review Comment:
`Blocks` is not required for `BytesRead` - change here and in all other
similar cases.
For ex: `shuffleMergedRemoteBlocksBytesRead` ->
`shuffleMergedRemoteBytesRead`, `shuffleMergedLocalBlocksBytesRead` ->
`shuffleMergedLocalBytesRead`
Same for `ShufflePushReadMetrics`, etc.
##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -791,6 +770,53 @@ private[spark] class Executor(
}
}
+ private def incrementShuffleMetrics(
+ executorSource: ExecutorSource,
+ metrics: TaskMetrics
+ ): Unit = {
+ executorSource.METRIC_SHUFFLE_FETCH_WAIT_TIME
+ .inc(metrics.shuffleReadMetrics.fetchWaitTime)
+
executorSource.METRIC_SHUFFLE_WRITE_TIME.inc(metrics.shuffleWriteMetrics.writeTime)
+ executorSource.METRIC_SHUFFLE_TOTAL_BYTES_READ
+ .inc(metrics.shuffleReadMetrics.totalBytesRead)
+ executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ
+ .inc(metrics.shuffleReadMetrics.remoteBytesRead)
+ executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ_TO_DISK
+ .inc(metrics.shuffleReadMetrics.remoteBytesReadToDisk)
+ executorSource.METRIC_SHUFFLE_LOCAL_BYTES_READ
+ .inc(metrics.shuffleReadMetrics.localBytesRead)
+ executorSource.METRIC_SHUFFLE_RECORDS_READ
+ .inc(metrics.shuffleReadMetrics.recordsRead)
+ executorSource.METRIC_SHUFFLE_REMOTE_BLOCKS_FETCHED
+ .inc(metrics.shuffleReadMetrics.remoteBlocksFetched)
+ executorSource.METRIC_SHUFFLE_LOCAL_BLOCKS_FETCHED
+ .inc(metrics.shuffleReadMetrics.localBlocksFetched)
+ executorSource.METRIC_SHUFFLE_REMOTE_REQS_DURATION
+ .inc(metrics.shuffleReadMetrics.remoteReqsDuration)
+ executorSource.METRIC_SHUFFLE_BYTES_WRITTEN
+ .inc(metrics.shuffleWriteMetrics.bytesWritten)
+ executorSource.METRIC_SHUFFLE_RECORDS_WRITTEN
+ .inc(metrics.shuffleWriteMetrics.recordsWritten)
+ executorSource.METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS
+ .inc(metrics.shuffleReadMetrics.corruptMergedBlockChunks)
+ executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_FETCH_FALLBACK_COUNT
+ .inc(metrics.shuffleReadMetrics.fallbackCount)
Review Comment:
Rename `fallbackCount` (and corresponding `_fallbackCount`) to match the
metric name ?
`mergedFetchFallbackCount` like in other places ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala:
##########
@@ -44,6 +44,26 @@ class SQLShuffleReadMetricsReporter(
metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME)
private[this] val _recordsRead =
metrics(SQLShuffleReadMetricsReporter.RECORDS_READ)
Review Comment:
Keep metrics here consistent with the other renames as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]