tgravescs commented on a change in pull request #34725:
URL: https://github.com/apache/spark/pull/34725#discussion_r766740700



##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -201,29 +206,36 @@ public void write(Iterator<Product2<K, V>> records) 
throws IOException {
    *
    * @return array of lengths, in bytes, of each partition of the file (used 
by map output tracker).
    */
-  private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) 
throws IOException {
+  private long[] writePartitionedData(
+          ShuffleMapOutputWriter mapOutputWriter,
+          Set<Integer> nonEmptyPartitionSet) throws IOException {

Review comment:
       should add description of parameters and why we need the 
nonEmptyPartitionSet

##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -201,29 +206,36 @@ public void write(Iterator<Product2<K, V>> records) 
throws IOException {
    *
    * @return array of lengths, in bytes, of each partition of the file (used 
by map output tracker).
    */
-  private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) 
throws IOException {
+  private long[] writePartitionedData(
+          ShuffleMapOutputWriter mapOutputWriter,
+          Set<Integer> nonEmptyPartitionSet) throws IOException {
     // Track location of the partition starts in the output file
     if (partitionWriters != null) {
       final long writeStartTime = System.nanoTime();
       try {
         for (int i = 0; i < numPartitions; i++) {
-          final File file = partitionWriterSegments[i].file();
-          ShufflePartitionWriter writer = 
mapOutputWriter.getPartitionWriter(i);
-          if (file.exists()) {
-            if (transferToEnabled) {
-              // Using WritableByteChannelWrapper to make resource closing 
consistent between
-              // this implementation and UnsafeShuffleWriter.
-              Optional<WritableByteChannelWrapper> maybeOutputChannel = 
writer.openChannelWrapper();
-              if (maybeOutputChannel.isPresent()) {
-                writePartitionedDataWithChannel(file, 
maybeOutputChannel.get());
+          if (nonEmptyPartitionSet.contains(i)) {
+            final File file = partitionWriterSegments[i].file();
+            ShufflePartitionWriter writer = 
mapOutputWriter.getPartitionWriter(i);
+            if (file.exists()) {
+              if (transferToEnabled) {
+                // Using WritableByteChannelWrapper to make resource closing 
consistent between
+                // this implementation and UnsafeShuffleWriter.
+                Optional<WritableByteChannelWrapper> maybeOutputChannel = 
writer.openChannelWrapper();
+                if (maybeOutputChannel.isPresent()) {
+                  writePartitionedDataWithChannel(file, 
maybeOutputChannel.get());
+                } else {
+                  writePartitionedDataWithStream(file, writer);
+                }
               } else {
                 writePartitionedDataWithStream(file, writer);
               }
+              if (!file.delete()) {
+                logger.error("Unable to delete file for partition {}", i);
+              }
             } else {
-              writePartitionedDataWithStream(file, writer);
-            }
-            if (!file.delete()) {
-              logger.error("Unable to delete file for partition {}", i);
+              throw new IOException(
+                      "Segment file " + file.getAbsolutePath() + " for 
partition " + i + " doesn't exists.");

Review comment:
       might be nice to add more description to the exception for user to 
understand what might have happened




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to