cloud-fan commented on a change in pull request #33451:
URL: https://github.com/apache/spark/pull/33451#discussion_r678890469
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -971,7 +1005,66 @@ final class ShuffleBlockFetcherIterator(
currentResult.mapIndex,
currentResult.address,
detectCorrupt && streamCompressedOrEncrypted,
- currentResult.isNetworkReqDone))
+ currentResult.isNetworkReqDone,
+ Option(checkedIn)))
+ }
+
+ /**
+ * Get the suspect corruption cause for the corrupted block. It should be
only invoked
+ * when checksum is enabled and corruption was detected at least once.
+ *
+ * This will firstly consume the rest of stream of the corrupted block to
calculate the
+ * checksum of the block. Then, it will raise a synchronized RPC call along
with the
+ * checksum to ask the server(where the corrupted block is fetched from) to
diagnose the
+ * cause of corruption and return it.
+ *
+ * Any exception raised during the process will result in the
[[Cause.UNKNOWN_ISSUE]] of the
+ * corruption cause since corruption diagnosis is only a best effort.
+ *
+ * @param checkedIn the [[CheckedInputStream]] which is used to calculate
the checksum.
+ * @param address the address where the corrupted block is fetched from.
+ * @param blockId the blockId of the corrupted block.
+ * @return The corruption diagnosis response for different causes.
+ */
+ private[storage] def diagnoseCorruption(
+ checkedIn: CheckedInputStream,
+ address: BlockManagerId,
+ blockId: BlockId): String = {
+ logInfo("Start corruption diagnosis.")
+ val startTimeNs = System.nanoTime()
+ assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId,
but got $blockId")
+ val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+ val buffer = new
Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
+ // consume the remaining data to calculate the checksum
+ var cause: Cause = null
+ try {
+ while (checkedIn.read(buffer) != -1) {}
+ val checksum = checkedIn.getChecksum.getValue
+ cause = shuffleClient.diagnoseCorruption(address.host, address.port,
address.executorId,
+ shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId,
checksum,
+ checksumAlgorithm)
+ } catch {
+ case e: Exception =>
+ logWarning("Unable to diagnose the corruption cause of the corrupted
block", e)
+ cause = Cause.UNKNOWN_ISSUE
+ }
+ val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs)
Review comment:
I'm confused. If we only need millisecond precision, why do we calculate
with nanosecond and convert to millisecond at the end? can't we use
`System.currentTimeMillis`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]