tgravescs commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r654518252
##########
File path:
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -120,6 +124,13 @@
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
this.shuffleExecutorComponents = shuffleExecutorComponents;
+ if ((boolean) conf.get(package$.MODULE$.SHUFFLE_CHECKSUM())) {
+ this.checksumEnabled = true;
+ this.partitionChecksums = new Adler32[numPartitions];
+ for (int i = 0; i < numPartitions; i ++) {
+ this.partitionChecksums[i] = new Adler32();
Review comment:
yeah I agree it would be nice to be configurable and recorded. perhaps
either extension on checksum file or metadata in the checksum file. I would
expect ESS to indicate if its supported - ie perhaps error is unknown or
unsupported checksum type when trying to diagnose. I think the other part
might be in the push based shuffle. I assume if its merging files it may have
to recalculate these so needs to know the algorithm to use?
##########
File path:
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##########
@@ -133,6 +144,26 @@
this.peakMemoryUsedBytes = getMemoryUsage();
this.diskWriteBufferSize =
(int) (long)
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+ this.checksumEnabled = (boolean)
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM());
+ if (this.checksumEnabled) {
+ this.partitionChecksums = new Adler32[numPartitions];
Review comment:
this seems to be duplicated in a bunch of places can we put in a common
place so that if we want to change or extend it would be easier?
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1368,6 +1368,14 @@ package object config {
s"The buffer size must be greater than 0 and less than or equal to
${Int.MaxValue}.")
.createWithDefault(4096)
+ private[spark] val SHUFFLE_CHECKSUM =
+ ConfigBuilder("spark.shuffle.checksum")
Review comment:
I think it would be more clear and extendible if we add .enabled as
well.
##########
File path:
core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
##########
@@ -68,8 +72,11 @@
* for that partition id.
* <p>
* 2) An optional metadata blob that can be used by shuffle readers.
+ *
+ * @param checksums The checksum values for each partition if shuffle
checksum enabled.
+ * Otherwise, it's empty.
Review comment:
I assume checksums index = partition id? might be nice to expand
comment to say that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]