Ngone51 commented on a change in pull request #33451:
URL: https://github.com/apache/spark/pull/33451#discussion_r676732889
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -971,7 +1000,50 @@ final class ShuffleBlockFetcherIterator(
currentResult.mapIndex,
currentResult.address,
detectCorrupt && streamCompressedOrEncrypted,
- currentResult.isNetworkReqDone))
+ currentResult.isNetworkReqDone,
+ Option(checkedIn)))
+ }
+
+ /**
+ * Get the suspect corruption cause for the corrupted block. It should be
only invoked
+ * when checksum is enabled.
+ *
+ * This will firstly consume the rest of stream of the corrupted block to
calculate the
+ * checksum of the block. Then, it will raise a synchronized RPC call along
with the
+ * checksum to ask the server(where the corrupted block is fetched from) to
diagnose the
+ * cause of corruption and return it.
+ *
+ * Any exception raised during the process will result in the
[[Cause.UNKNOWN_ISSUE]] of the
+ * corruption cause since corruption diagnosis is only a best effort.
+ *
+ * @param checkedIn the [[CheckedInputStream]] which is used to calculate
the checksum.
+ * @param address the address where the corrupted block is fetched from.
+ * @param blockId the blockId of the corrupted block.
+ * @return the cause of corruption, which should be one of the [[Cause]].
+ */
+ private[storage] def diagnoseCorruption(
+ checkedIn: CheckedInputStream,
+ address: BlockManagerId,
+ blockId: BlockId): Cause = {
+ logInfo("Start corruption diagnosis.")
+ val startTimeNs = System.nanoTime()
+ assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId,
but got $blockId")
+ val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+ val buffer = new
Array[Byte](ShuffleCorruptionDiagnosisHelper.CHECKSUM_CALCULATION_BUFFER)
+ // consume the remaining data to calculate the checksum
+ try {
+ while (checkedIn.read(buffer, 0, 8192) != -1) {}
+ } catch {
+ case e: IOException =>
+ logWarning("IOException throws while consuming the rest stream of the
corrupted block", e)
+ return Cause.UNKNOWN_ISSUE
+ }
+ val checksum = checkedIn.getChecksum.getValue
+ val cause = shuffleClient.diagnoseCorruption(address.host, address.port,
address.executorId,
Review comment:
sgtm. Addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]