shardulm94 commented on a change in pull request #33446:
URL: https://github.com/apache/spark/pull/33446#discussion_r674309165
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1693,4 +1700,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
}
+ test("logging of slow block fetches") {
+ val thresholdBps = 1024
+ val thresholdMs = 1000
+ val fetcher = createShuffleBlockIteratorWithDefaults(
+ Map.empty,
+ slowFetchThresholdMs = thresholdMs,
+ slowFetchThresholdBps = thresholdBps)
+ val totalBytes = 10000
+ val blockCount = 3
+ val bmId = BlockManagerId("execID", "hostname", 1234)
+
+ // Match the logger name used by the Logging trait
+ val logCapture = new
Log4jCapture(ShuffleBlockFetcherIterator.getClass.getName.stripSuffix("$"))
+ try {
+ // Fetch with a read that is slightly slower than the threshold
+ val elapsedMs = 1000 * totalBytes / thresholdBps + 1
+ fetcher.logFetchIfSlow(elapsedMs, totalBytes, blockCount, bmId)
+ val expectedMessage = s"Slow shuffle block fetch detected: Fetching
blocks took longer " +
+ s"than expected ( blockCount = $blockCount , totalBlockSize =
$totalBytes , " +
+ s"blockManagerId = BlockManagerId(execID, hostname, 1234, None) , " +
+ s"durationMs = $elapsedMs , " +
+ s"transferBps = ${(totalBytes.toDouble / elapsedMs * 1000).toLong} )\n"
+ assert(expectedMessage === logCapture.getOutput)
+
+ // Fetch with a read that is slightly faster than the threshold
+ logCapture.clearOutput()
+ fetcher.logFetchIfSlow(1000 * totalBytes / thresholdBps - 1, totalBytes,
blockCount, bmId)
+ assert("" === logCapture.getOutput)
+
+ // When the number of bytes is too small, even though the transfer rate
is slow, the
+ // thresholdMillis will not be satisfied
+ val smallBytes = 10
+ fetcher.logFetchIfSlow(1000 * smallBytes / thresholdBps + 1, smallBytes,
blockCount, bmId)
+ assert("" === logCapture.getOutput)
+ } finally if (logCapture != null) {
Review comment:
👍 Removed
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1693,4 +1700,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
}
+ test("logging of slow block fetches") {
Review comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]