cloud-fan commented on a change in pull request #33451:
URL: https://github.com/apache/spark/pull/33451#discussion_r676354691



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -971,7 +1000,50 @@ final class ShuffleBlockFetcherIterator(
         currentResult.mapIndex,
         currentResult.address,
         detectCorrupt && streamCompressedOrEncrypted,
-        currentResult.isNetworkReqDone))
+        currentResult.isNetworkReqDone,
+        Option(checkedIn)))
+  }
+
+  /**
+   * Get the suspect corruption cause for the corrupted block. It should be 
only invoked
+   * when checksum is enabled.
+   *
+   * This will firstly consume the rest of stream of the corrupted block to 
calculate the
+   * checksum of the block. Then, it will raise a synchronized RPC call along 
with the
+   * checksum to ask the server(where the corrupted block is fetched from) to 
diagnose the
+   * cause of corruption and return it.
+   *
+   * Any exception raised during the process will result in the 
[[Cause.UNKNOWN_ISSUE]] of the
+   * corruption cause since corruption diagnosis is only a best effort.
+   *
+   * @param checkedIn the [[CheckedInputStream]] which is used to calculate 
the checksum.
+   * @param address the address where the corrupted block is fetched from.
+   * @param blockId the blockId of the corrupted block.
+   * @return the cause of corruption, which should be one of the [[Cause]].
+   */
+  private[storage] def diagnoseCorruption(
+      checkedIn: CheckedInputStream,
+      address: BlockManagerId,
+      blockId: BlockId): Cause = {
+    logInfo("Start corruption diagnosis.")
+    val startTimeNs = System.nanoTime()
+    assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId, 
but got $blockId")
+    val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+    val buffer = new 
Array[Byte](ShuffleCorruptionDiagnosisHelper.CHECKSUM_CALCULATION_BUFFER)
+    // consume the remaining data to calculate the checksum
+    try {
+      while (checkedIn.read(buffer, 0, 8192) != -1) {}
+    } catch {
+      case e: IOException =>
+        logWarning("IOException throws while consuming the rest stream of the 
corrupted block", e)
+        return Cause.UNKNOWN_ISSUE
+    }
+    val checksum = checkedIn.getChecksum.getValue
+    val cause = shuffleClient.diagnoseCorruption(address.host, address.port, 
address.executorId,

Review comment:
       Can we only calculate and verify the checksum if all the retries failed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to