tgravescs commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r654518252



##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -120,6 +124,13 @@
     this.writeMetrics = writeMetrics;
     this.serializer = dep.serializer();
     this.shuffleExecutorComponents = shuffleExecutorComponents;
+    if ((boolean) conf.get(package$.MODULE$.SHUFFLE_CHECKSUM())) {
+      this.checksumEnabled = true;
+      this.partitionChecksums = new Adler32[numPartitions];
+      for (int i = 0; i < numPartitions; i ++) {
+        this.partitionChecksums[i] = new Adler32();

Review comment:
       yeah I agree it would be nice to be configurable and recorded.  perhaps 
either extension on checksum file or metadata in the checksum file.   I would 
expect ESS to indicate if its supported - ie perhaps error is unknown or 
unsupported checksum type when trying to diagnose.   I think the other part 
might be in the push based shuffle. I assume  if its merging files it may have 
to recalculate these so needs to know the algorithm to use?

##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##########
@@ -133,6 +144,26 @@
     this.peakMemoryUsedBytes = getMemoryUsage();
     this.diskWriteBufferSize =
         (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+    this.checksumEnabled = (boolean) 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM());
+    if (this.checksumEnabled) {
+      this.partitionChecksums = new Adler32[numPartitions];

Review comment:
       this seems to be duplicated in a bunch of places can we put in a common 
place so that if we want to change or extend it would be easier?  

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1368,6 +1368,14 @@ package object config {
         s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
       .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM =
+    ConfigBuilder("spark.shuffle.checksum")

Review comment:
       I think it would be more clear and extendible if we add .enabled as 
well. 

##########
File path: 
core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
##########
@@ -68,8 +72,11 @@
    *    for that partition id.
    * <p>
    * 2) An optional metadata blob that can be used by shuffle readers.
+   *
+   * @param checksums The checksum values for each partition if shuffle 
checksum enabled.
+   *                  Otherwise, it's empty.

Review comment:
       I assume checksums index = partition id?  might be nice to expand 
comment to say that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to