xkrogen commented on a change in pull request #33446:
URL: https://github.com/apache/spark/pull/33446#discussion_r673474836
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -87,10 +94,13 @@ final class ShuffleBlockFetcherIterator(
maxBlocksInFlightPerAddress: Int,
val maxReqSizeShuffleToMem: Long,
maxAttemptsOnNettyOOM: Int,
+ slowFetchThresholdMs: Long,
+ slowFetchThresholdBps: Long,
detectCorrupt: Boolean,
detectCorruptUseExtraMemory: Boolean,
shuffleMetrics: ShuffleReadMetricsReporter,
- doBatchFetch: Boolean)
+ doBatchFetch: Boolean,
+ clock: Clock = new SystemClock())
Review comment:
If we're adding a clock, we should use it for all timing information
within the class. How big would the diff be to move all uses of `System`-based
timing to the `clock`?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -260,6 +270,7 @@ final class ShuffleBlockFetcherIterator(
val deferredBlocks = new ArrayBuffer[String]()
val blockIds = req.blocks.map(_.blockId.toString)
val address = req.address
+ val requestStartTime = clock.getTimeMillis()
Review comment:
Can we use nano time throughout? This is more reliable for timing
information.
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1693,4 +1700,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
}
+ test("logging of slow block fetches") {
+ val thresholdBps = 1024
+ val thresholdMs = 1000
+ val fetcher = createShuffleBlockIteratorWithDefaults(
+ Map.empty,
+ slowFetchThresholdMs = thresholdMs,
+ slowFetchThresholdBps = thresholdBps)
+ val totalBytes = 10000
+ val blockCount = 3
+ val bmId = BlockManagerId("execID", "hostname", 1234)
+
+ // Match the logger name used by the Logging trait
+ val logCapture = new
Log4jCapture(ShuffleBlockFetcherIterator.getClass.getName.stripSuffix("$"))
+ try {
+ // Fetch with a read that is slightly slower than the threshold
+ val elapsedMs = 1000 * totalBytes / thresholdBps + 1
+ fetcher.logFetchIfSlow(elapsedMs, totalBytes, blockCount, bmId)
+ val expectedMessage = s"Slow shuffle block fetch detected: Fetching
blocks took longer " +
+ s"than expected ( blockCount = $blockCount , totalBlockSize =
$totalBytes , " +
+ s"blockManagerId = BlockManagerId(execID, hostname, 1234, None) , " +
+ s"durationMs = $elapsedMs , " +
+ s"transferBps = ${(totalBytes.toDouble / elapsedMs * 1000).toLong} )\n"
+ assert(expectedMessage === logCapture.getOutput)
+
+ // Fetch with a read that is slightly faster than the threshold
+ logCapture.clearOutput()
+ fetcher.logFetchIfSlow(1000 * totalBytes / thresholdBps - 1, totalBytes,
blockCount, bmId)
+ assert("" === logCapture.getOutput)
+
+ // When the number of bytes is too small, even though the transfer rate
is slow, the
+ // thresholdMillis will not be satisfied
+ val smallBytes = 10
+ fetcher.logFetchIfSlow(1000 * smallBytes / thresholdBps + 1, smallBytes,
blockCount, bmId)
+ assert("" === logCapture.getOutput)
+ } finally if (logCapture != null) {
Review comment:
`logCapture` will never be null here right? Is this check necessary?
##########
File path: core/src/main/scala/org/apache/spark/TestUtils.scala
##########
@@ -448,6 +451,33 @@ private[spark] object TestUtils {
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
file.getPath
}
+
+ /**
+ * A log4j-specific log capturing mechanism. Provided with a class name, it
will add a new
+ * appender to the log for that class to capture the output. log4j must be
properly configured
+ * on the classpath (e.g., with slf4j-log4j12) for this to work. This should
be closed when it is
+ * done to remove the temporary appender.
+ */
+ class Log4jCapture(val loggerName: String) extends AutoCloseable {
+ private val logger = LogManager.getLogger(loggerName)
+ private val writer = new StringWriter()
+ private val appender = new WriterAppender(new PatternLayout(), writer)
+ logger.addAppender(appender)
+
+ def this(classToCapture: Class[_]) = {
+ this(classToCapture.getName)
+ }
+
Review comment:
seems this constructor isn't used? Maybe we can move the logic to strip
the `$`-suffix here
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1693,4 +1700,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
}
+ test("logging of slow block fetches") {
Review comment:
Can you prefix with the ticket number like: "SPARK-36215 logging of slow
block fetches"
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1140,6 +1156,23 @@ final class ShuffleBlockFetcherIterator(
}
removedChunkIds
}
+
+ private[storage] def logFetchIfSlow(
+ durationMs: Long,
+ totalBlockSize: Long,
+ blockCount: Int,
+ blockManagerId: BlockManagerId): Unit = {
+ val transferBps = if (durationMs > 0) {
Review comment:
minor nit: indent the parameter list by 4 spaces
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]