otterc commented on a change in pull request #33451:
URL: https://github.com/apache/spark/pull/33451#discussion_r674280177



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -63,7 +63,7 @@ public ExternalBlockStoreClient(
       SecretKeyHolder secretKeyHolder,
       boolean authEnabled,
       long registrationTimeoutMs) {
-    this.conf = conf;
+    this.transportConf = conf;

Review comment:
       Note to self: all the changes in this file are related to renaming of 
`conf`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.
+   *
+   * There're 3 different kinds of checksums for the same shuffle partition:
+   *   - checksum (c1) that calculated by the shuffle data reader
+   *   - checksum (c2) that calculated by the shuffle data writer and stored 
in the checksum file
+   *   - checksum (c3) that recalculated during diagnosis
+   *
+   * And the diagnosis mechanism works like this:
+   * If c2 != c3, we suspect the corruption is caused by the DISK_ISSUE. 
Otherwise, if c1 != c3,
+   * we suspect the corruption is caused by the NETWORK_ISSUE. Otherwise, the 
cause remains
+   * CHECKSUM_VERIFY_PASS. In case of the any other failures, the cause 
remains UNKNOWN_ISSUE.
+   *
+   * @param checksumFile The checksum file that written by the shuffle writer
+   * @param reduceId The reduceId of the shuffle block
+   * @param partitionData The partition data of the shuffle block
+   * @param checksumByReader The checksum value that calculated by the shuffle 
data reader
+   * @return The cause of data corruption
+   */
+  public static Cause diagnoseCorruption(
+      File checksumFile,
+      int reduceId,
+      ManagedBuffer partitionData,
+      long checksumByReader) {
+    Cause cause;
+    if (checksumFile.exists()) {

Review comment:
       Here, the checksumFile would definitely exist correct, given the logic  
in `diagnoseShuffleBlockCorruption` to find the file by listing the files in 
the parent 
   

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/corruption/Cause.java
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.corruption;
+
+public enum Cause {

Review comment:
       Nit: java doc and missing `Since`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
##########
@@ -374,6 +376,32 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
       .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums
+   */
+  public Cause diagnoseShuffleBlockCorruption(
+      String appId,
+      String execId,
+      int shuffleId,
+      long mapId,
+      int reduceId,
+      long checksumByReader) {
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+    String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum";
+    File probeFile = ExecutorDiskUtils.getFile(
+      executor.localDirs,
+      executor.subDirsPerLocalDir,
+      fileName);
+    File parentFile = probeFile.getParentFile();
+
+    File[] checksumFiles = parentFile.listFiles(f -> 
f.getName().startsWith(fileName));
+    assert checksumFiles.length == 1;

Review comment:
       line 397 suggests that there can be multiple checksum files but then we 
assert in the next line that there would be just 1 file. It's not clear to me 
why this is being done. I suspect listing files in the dir (even though there 
is a filter) will be costly. 
   Is it because it was decided to append the checksum algorithm to the file 
name and the exact name of the file is unknown?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.

Review comment:
       Nit: `verifying`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -35,6 +35,7 @@
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.corruption.Cause;

Review comment:
       All the changes in the file seemed to be related rename of `conf` to 
`transportConf`. `Cause` doesn't seem to be used anywhere

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -46,6 +46,43 @@
 
   protected volatile TransportClientFactory clientFactory;
   protected String appId;
+  protected TransportConf transportConf;
+
+  /**
+   * Send the diagnosis request for the corrupted shuffle block to the server.
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param execId the executor id.
+   * @param shuffleId the shuffleId of the corrupted shuffle block
+   * @param mapId the mapId of the corrupted shuffle block
+   * @param reduceId the reduceId of the corrupted shuffle block
+   * @param checksum the shuffle checksum which calculated at client side for 
the corrupted
+   *                 shuffle block
+   * @return The cause of the shuffle block corruption
+   */
+  public Cause diagnoseCorruption(
+     String host,
+     int port,
+     String execId,
+     int shuffleId,
+     long mapId,
+     int reduceId,

Review comment:
       Nit: this indentation seems 3 spaces

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.
+   *
+   * There're 3 different kinds of checksums for the same shuffle partition:
+   *   - checksum (c1) that calculated by the shuffle data reader
+   *   - checksum (c2) that calculated by the shuffle data writer and stored 
in the checksum file
+   *   - checksum (c3) that recalculated during diagnosis
+   *
+   * And the diagnosis mechanism works like this:
+   * If c2 != c3, we suspect the corruption is caused by the DISK_ISSUE. 
Otherwise, if c1 != c3,
+   * we suspect the corruption is caused by the NETWORK_ISSUE. Otherwise, the 
cause remains
+   * CHECKSUM_VERIFY_PASS. In case of the any other failures, the cause 
remains UNKNOWN_ISSUE.
+   *
+   * @param checksumFile The checksum file that written by the shuffle writer
+   * @param reduceId The reduceId of the shuffle block
+   * @param partitionData The partition data of the shuffle block
+   * @param checksumByReader The checksum value that calculated by the shuffle 
data reader
+   * @return The cause of data corruption
+   */
+  public static Cause diagnoseCorruption(
+      File checksumFile,
+      int reduceId,
+      ManagedBuffer partitionData,
+      long checksumByReader) {
+    Cause cause;
+    if (checksumFile.exists()) {
+      try {
+        long diagnoseStart = System.currentTimeMillis();
+        long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
+        Checksum checksumAlgo = 
getChecksumByFileExtension(checksumFile.getName());
+        long checksumByReCalculation = 
calculateChecksumForPartition(partitionData, checksumAlgo);
+        long duration = System.currentTimeMillis() - diagnoseStart;
+        logger.info("Shuffle corruption diagnosis took " + duration + " ms");

Review comment:
       Nit:         logger.info("Shuffle corruption diagnosis took {} ms", 
duration);

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.
+   *
+   * There're 3 different kinds of checksums for the same shuffle partition:
+   *   - checksum (c1) that calculated by the shuffle data reader

Review comment:
       nit: `checksum (c1) that is`

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -971,7 +1000,50 @@ final class ShuffleBlockFetcherIterator(
         currentResult.mapIndex,
         currentResult.address,
         detectCorrupt && streamCompressedOrEncrypted,
-        currentResult.isNetworkReqDone))
+        currentResult.isNetworkReqDone,
+        Option(checkedIn)))
+  }
+
+  /**
+   * Get the suspect corruption cause for the corrupted block. It should be 
only invoked
+   * when checksum is enabled.
+   *
+   * This will firstly consume the rest of stream of the corrupted block to 
calculate the
+   * checksum of the block. Then, it will raise a synchronized RPC call along 
with the
+   * checksum to ask the server(where the corrupted block is fetched from) to 
diagnose the
+   * cause of corruption and return it.
+   *
+   * Any exception raised during the process will result in the 
[[Cause.UNKNOWN_ISSUE]] of the
+   * corruption cause since corruption diagnosis is only a best effort.
+   *
+   * @param checkedIn the [[CheckedInputStream]] which is used to calculate 
the checksum.
+   * @param address the address where the corrupted block is fetched from.
+   * @param blockId the blockId of the corrupted block.
+   * @return the cause of corruption, which should be one of the [[Cause]].
+   */
+  private[storage] def diagnoseCorruption(
+      checkedIn: CheckedInputStream,
+      address: BlockManagerId,
+      blockId: BlockId): Cause = {
+    logInfo("Start corruption diagnosis.")
+    val startTimeNs = System.nanoTime()
+    assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId, 
but got $blockId")
+    val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+    val buffer = new 
Array[Byte](ShuffleCorruptionDiagnosisHelper.CHECKSUM_CALCULATION_BUFFER)
+    // consume the remaining data to calculate the checksum
+    try {
+      while (checkedIn.read(buffer, 0, 8192) != -1) {}
+    } catch {
+      case e: IOException =>
+        logWarning("IOException throws while consuming the rest stream of the 
corrupted block", e)
+        return Cause.UNKNOWN_ISSUE
+    }
+    val checksum = checkedIn.getChecksum.getValue
+    val cause = shuffleClient.diagnoseCorruption(address.host, address.port, 
address.executorId,

Review comment:
       This is a blocking call made to the server to get the cause of 
corruption. Wouldn't this introduce quite a bit of slowness?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to