mridulm commented on a change in pull request #33451:
URL: https://github.com/apache/spark/pull/33451#discussion_r674486929



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {

Review comment:
       Remove `throws` here ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
##########
@@ -374,6 +376,32 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
       .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums
+   */
+  public Cause diagnoseShuffleBlockCorruption(
+      String appId,
+      String execId,
+      int shuffleId,
+      long mapId,
+      int reduceId,
+      long checksumByReader) {
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+    String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum";
+    File probeFile = ExecutorDiskUtils.getFile(
+      executor.localDirs,
+      executor.subDirsPerLocalDir,
+      fileName);

Review comment:
       Btw, while looking at this, an unrelated potential issue - we use 
`intern` in `ExecutorDiskUtils`.
   Probably should move to using guava interner (`Utils.weakIntern` does this) 
... thoughts @Ngone51 ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.
+   *
+   * There're 3 different kinds of checksums for the same shuffle partition:
+   *   - checksum (c1) that calculated by the shuffle data reader
+   *   - checksum (c2) that calculated by the shuffle data writer and stored 
in the checksum file
+   *   - checksum (c3) that recalculated during diagnosis
+   *
+   * And the diagnosis mechanism works like this:
+   * If c2 != c3, we suspect the corruption is caused by the DISK_ISSUE. 
Otherwise, if c1 != c3,
+   * we suspect the corruption is caused by the NETWORK_ISSUE. Otherwise, the 
cause remains
+   * CHECKSUM_VERIFY_PASS. In case of the any other failures, the cause 
remains UNKNOWN_ISSUE.
+   *
+   * @param checksumFile The checksum file that written by the shuffle writer
+   * @param reduceId The reduceId of the shuffle block
+   * @param partitionData The partition data of the shuffle block
+   * @param checksumByReader The checksum value that calculated by the shuffle 
data reader
+   * @return The cause of data corruption
+   */
+  public static Cause diagnoseCorruption(
+      File checksumFile,
+      int reduceId,
+      ManagedBuffer partitionData,
+      long checksumByReader) {
+    Cause cause;
+    if (checksumFile.exists()) {

Review comment:
       Even if it was concurrently cleaned up, the `exists` call is not buying 
us much - we can remove it : and rely on the exception handling.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.
+   *
+   * There're 3 different kinds of checksums for the same shuffle partition:
+   *   - checksum (c1) that calculated by the shuffle data reader
+   *   - checksum (c2) that calculated by the shuffle data writer and stored 
in the checksum file
+   *   - checksum (c3) that recalculated during diagnosis
+   *
+   * And the diagnosis mechanism works like this:
+   * If c2 != c3, we suspect the corruption is caused by the DISK_ISSUE. 
Otherwise, if c1 != c3,
+   * we suspect the corruption is caused by the NETWORK_ISSUE. Otherwise, the 
cause remains
+   * CHECKSUM_VERIFY_PASS. In case of the any other failures, the cause 
remains UNKNOWN_ISSUE.
+   *
+   * @param checksumFile The checksum file that written by the shuffle writer
+   * @param reduceId The reduceId of the shuffle block
+   * @param partitionData The partition data of the shuffle block
+   * @param checksumByReader The checksum value that calculated by the shuffle 
data reader
+   * @return The cause of data corruption
+   */
+  public static Cause diagnoseCorruption(
+      File checksumFile,
+      int reduceId,
+      ManagedBuffer partitionData,
+      long checksumByReader) {
+    Cause cause;
+    if (checksumFile.exists()) {
+      try {
+        long diagnoseStart = System.currentTimeMillis();
+        long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
+        Checksum checksumAlgo = 
getChecksumByFileExtension(checksumFile.getName());
+        long checksumByReCalculation = 
calculateChecksumForPartition(partitionData, checksumAlgo);
+        long duration = System.currentTimeMillis() - diagnoseStart;
+        logger.info("Shuffle corruption diagnosis took " + duration + " ms");

Review comment:
       Also include the checksumFile path in log message - so that we can find 
out the algo/reducer/app in case we need to debug the checksum handling itself.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -46,6 +46,43 @@
 
   protected volatile TransportClientFactory clientFactory;
   protected String appId;
+  protected TransportConf transportConf;

Review comment:
       Thoughts on making this `final` and initialize it in constructor ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);
+      return in.readLong();
+    }
+  }
+
+  private static long calculateChecksumForPartition(
+      ManagedBuffer partitionData,
+      Checksum checksumAlgo) throws IOException {
+    InputStream in = partitionData.createInputStream();
+    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
+    try(CheckedInputStream checksumIn = new CheckedInputStream(in, 
checksumAlgo)) {
+      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
+      return checksumAlgo.getValue();
+    }
+  }
+
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums.
+   *
+   * There're 3 different kinds of checksums for the same shuffle partition:
+   *   - checksum (c1) that calculated by the shuffle data reader
+   *   - checksum (c2) that calculated by the shuffle data writer and stored 
in the checksum file
+   *   - checksum (c3) that recalculated during diagnosis
+   *
+   * And the diagnosis mechanism works like this:
+   * If c2 != c3, we suspect the corruption is caused by the DISK_ISSUE. 
Otherwise, if c1 != c3,
+   * we suspect the corruption is caused by the NETWORK_ISSUE. Otherwise, the 
cause remains
+   * CHECKSUM_VERIFY_PASS. In case of the any other failures, the cause 
remains UNKNOWN_ISSUE.
+   *
+   * @param checksumFile The checksum file that written by the shuffle writer
+   * @param reduceId The reduceId of the shuffle block
+   * @param partitionData The partition data of the shuffle block
+   * @param checksumByReader The checksum value that calculated by the shuffle 
data reader
+   * @return The cause of data corruption
+   */
+  public static Cause diagnoseCorruption(
+      File checksumFile,
+      int reduceId,
+      ManagedBuffer partitionData,
+      long checksumByReader) {
+    Cause cause;
+    if (checksumFile.exists()) {
+      try {
+        long diagnoseStart = System.currentTimeMillis();
+        long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
+        Checksum checksumAlgo = 
getChecksumByFileExtension(checksumFile.getName());
+        long checksumByReCalculation = 
calculateChecksumForPartition(partitionData, checksumAlgo);
+        long duration = System.currentTimeMillis() - diagnoseStart;
+        logger.info("Shuffle corruption diagnosis took " + duration + " ms");
+        if (checksumByWriter != checksumByReCalculation) {
+          cause = Cause.DISK_ISSUE;
+        } else if (checksumByWriter != checksumByReader) {
+          cause = Cause.NETWORK_ISSUE;
+        } else {
+          cause = Cause.CHECKSUM_VERIFY_PASS;

Review comment:
       I like how pessimistic we are, and check all verified condition last :-)

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
##########
@@ -374,6 +376,32 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
       .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums
+   */
+  public Cause diagnoseShuffleBlockCorruption(
+      String appId,
+      String execId,
+      int shuffleId,
+      long mapId,
+      int reduceId,
+      long checksumByReader) {
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+    String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum";
+    File probeFile = ExecutorDiskUtils.getFile(
+      executor.localDirs,
+      executor.subDirsPerLocalDir,
+      fileName);
+    File parentFile = probeFile.getParentFile();
+
+    File[] checksumFiles = parentFile.listFiles(f -> 
f.getName().startsWith(fileName));
+    assert checksumFiles.length == 1;

Review comment:
       Different applications can be configured with different checksum algo's 
- while ESS will support a super set of these, we wont know which ones are 
configured for a given app - hence the need to list + query.
   
   Having said that, I agree with @otterc, `listFiles` here can become 
expensive as number of files increases : since that api is expensive.
   We have two options:
   
   a) Use Files.newDirectoryStream - this is more efficient, and could end up 
being sufficient.
   b) Query for individual checksum algo's we support - this can become N 
number of point lookups.
   
   I would prefer (a) unless we see perf issues and need to go (b).

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.checksum;
+
+import java.io.*;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.corruption.Cause;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of utility functions for the shuffle checksum.
+ */
+@Private
+public class ShuffleCorruptionDiagnosisHelper {
+  private static final Logger logger =
+    LoggerFactory.getLogger(ShuffleCorruptionDiagnosisHelper.class);
+
+  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
+
+  private static Checksum[] getChecksumByAlgorithm(int num, String algorithm)
+    throws UnsupportedOperationException {
+    Checksum[] checksums;
+    switch (algorithm) {
+      case "ADLER32":
+        checksums = new Adler32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new Adler32();
+        }
+        return checksums;
+
+      case "CRC32":
+        checksums = new CRC32[num];
+        for (int i = 0; i < num; i++) {
+          checksums[i] = new CRC32();
+        }
+        return checksums;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported shuffle checksum 
algorithm: " +
+          algorithm);
+    }
+  }
+
+  public static Checksum getChecksumByFileExtension(String fileName)
+    throws UnsupportedOperationException {
+    int index = fileName.lastIndexOf(".");
+    String algorithm = fileName.substring(index + 1);
+    return getChecksumByAlgorithm(1, algorithm)[0];
+  }
+
+  private static long readChecksumByReduceId(File checksumFile, int reduceId) 
throws IOException {
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(checksumFile))) {
+      in.skip(reduceId * 8L);

Review comment:
       `skip` can end up not skipping the required number of bytes.
   Use something like `ByteStreams.skipFully` here ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -822,8 +836,8 @@ final class ShuffleBlockFetcherIterator(
               }
             } catch {
               case e: IOException =>
-                buf.release()
                 if (blockId.isShuffleChunk) {

Review comment:
       Can you add a TODO here to handle checksum support for push based 
shuffle ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/DiagnoseCorruption.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/** Request to get the cause of a corrupted block. Returns {@link 
CorruptionCause} */
+public class DiagnoseCorruption extends BlockTransferMessage {
+  public final String appId;
+  public final String execId;
+  public final int shuffleId;
+  public final long mapId;
+  public final int reduceId;
+  public final long checksum;
+
+  public DiagnoseCorruption(
+      String appId,
+      String execId,
+      int shuffleId,
+      long mapId,
+      int reduceId,
+      long checksum) {
+    this.appId = appId;
+    this.execId = execId;
+    this.shuffleId = shuffleId;
+    this.mapId = mapId;
+    this.reduceId = reduceId;
+    this.checksum = checksum;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.DIAGNOSE_CORRUPTION;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("appId", appId)
+      .append("execId", execId)
+      .append("shuffleId", shuffleId)
+      .append("mapId", mapId)
+      .append("reduceId", reduceId)
+      .append("checksum", checksum)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    DiagnoseCorruption that = (DiagnoseCorruption) o;
+
+    if (checksum != that.checksum) return false;
+    if (!appId.equals(that.appId)) return false;
+    if (!execId.equals(that.execId)) return false;
+    if (shuffleId != that.shuffleId) return false;
+    if (mapId != that.mapId) return false;
+    if (reduceId != that.reduceId) return false;

Review comment:
       reorder so that we check the cheaper fields first ? (move appId and 
execId check after the others)

##########
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##########
@@ -70,6 +69,7 @@ private[spark] class NettyBlockTransferService(
     val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, 
blockDataManager)
     var serverBootstrap: Option[TransportServerBootstrap] = None
     var clientBootstrap: Option[TransportClientBootstrap] = None
+    transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)

Review comment:
       nit: `this.transportConf`

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -536,11 +536,15 @@ private[spark] class IndexShuffleBlockResolver(
       dirs: Option[Array[String]] = None): File = {
     val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
     val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId, conf)
-    dirs
-      .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, 
fileName))
-      .getOrElse {
-        blockManager.diskBlockManager.getFile(fileName)
-      }
+    // We should use the blockId.name as the file name first to create the 
file so that
+    // readers (e.g., shuffle external service) without knowing the checksum 
algorithm
+    // could also find the file.
+    val file = dirs
+      .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, 
blockId.name))
+      .getOrElse(blockManager.diskBlockManager.getFile(blockId))

Review comment:
       Here, we are assuming how `getChecksumFileName` constructs the 
`fileName` in order to get to the block's filename here - any change there will 
need to be duplicated here.
   Instead, why not simply strip the suffix and use that ?
   
   
   ```suggestion
       val fileNameWithoutChecksum = fileName.substring(0, 
fileName.lastIndexOf('.'))
       val file = dirs
         .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, 
fileNameWithoutChecksum))
         
.getOrElse(blockManager.diskBlockManager.getFile(fileNameWithoutChecksum))
   ```

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
##########
@@ -374,6 +376,32 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
       .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
+  /**
+   * Diagnose the possible cause of the shuffle data corruption by verify the 
shuffle checksums
+   */
+  public Cause diagnoseShuffleBlockCorruption(
+      String appId,
+      String execId,
+      int shuffleId,
+      long mapId,
+      int reduceId,
+      long checksumByReader) {
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+    String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum";
+    File probeFile = ExecutorDiskUtils.getFile(
+      executor.localDirs,
+      executor.subDirsPerLocalDir,
+      fileName);
+    File parentFile = probeFile.getParentFile();
+
+    File[] checksumFiles = parentFile.listFiles(f -> 
f.getName().startsWith(fileName));
+    assert checksumFiles.length == 1;

Review comment:
       Btw, do we want an assertion here ?
   If there is some reason why the checksum is not present (for example, 
cleanup) - why not simply return `Cause.UNKNOWN_ISSUE` in that case ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -971,7 +1000,50 @@ final class ShuffleBlockFetcherIterator(
         currentResult.mapIndex,
         currentResult.address,
         detectCorrupt && streamCompressedOrEncrypted,
-        currentResult.isNetworkReqDone))
+        currentResult.isNetworkReqDone,
+        Option(checkedIn)))
+  }
+
+  /**
+   * Get the suspect corruption cause for the corrupted block. It should be 
only invoked
+   * when checksum is enabled.
+   *
+   * This will firstly consume the rest of stream of the corrupted block to 
calculate the
+   * checksum of the block. Then, it will raise a synchronized RPC call along 
with the
+   * checksum to ask the server(where the corrupted block is fetched from) to 
diagnose the
+   * cause of corruption and return it.
+   *
+   * Any exception raised during the process will result in the 
[[Cause.UNKNOWN_ISSUE]] of the
+   * corruption cause since corruption diagnosis is only a best effort.
+   *
+   * @param checkedIn the [[CheckedInputStream]] which is used to calculate 
the checksum.
+   * @param address the address where the corrupted block is fetched from.
+   * @param blockId the blockId of the corrupted block.
+   * @return the cause of corruption, which should be one of the [[Cause]].
+   */
+  private[storage] def diagnoseCorruption(
+      checkedIn: CheckedInputStream,
+      address: BlockManagerId,
+      blockId: BlockId): Cause = {
+    logInfo("Start corruption diagnosis.")
+    val startTimeNs = System.nanoTime()
+    assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId, 
but got $blockId")
+    val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+    val buffer = new 
Array[Byte](ShuffleCorruptionDiagnosisHelper.CHECKSUM_CALCULATION_BUFFER)
+    // consume the remaining data to calculate the checksum
+    try {
+      while (checkedIn.read(buffer, 0, 8192) != -1) {}

Review comment:
       `8192` -> `CHECKSUM_CALCULATION_BUFFER`
   Or better still, simply use `read(buffer)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to