Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r670100864



##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver(
           // Another attempt for the same task has already written our map 
outputs successfully,
           // so just use the existing partition lengths and delete our 
temporary map outputs.
           System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+          if (checksumEnabled) {
+            val existingChecksums = getChecksums(checksumFileOpt.get, 
checksums.length)
+            if (existingChecksums != null) {
+              System.arraycopy(existingChecksums, 0, checksums, 0, 
lengths.length)
+            } else {
+              // It's possible that the previous task attempt succeeded 
writing the
+              // index file and data file but failed to write the checksum 
file. In
+              // this case, the current task attempt could write the missing 
checksum
+              // file by itself.
+              writeMetadataFile(checksums, checksumTmpOpt.get, 
checksumFileOpt.get, false)
+            }
+          }
           if (dataTmp != null && dataTmp.exists()) {
             dataTmp.delete()
           }
         } else {
           // This is the first successful attempt in writing the map outputs 
for this task,
           // so override any existing index and data files with the ones we 
wrote.
-          val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
-          Utils.tryWithSafeFinally {
-            // We take in lengths of each block, need to convert it to offsets.
-            var offset = 0L
-            out.writeLong(offset)
-            for (length <- lengths) {
-              offset += length
-              out.writeLong(offset)
-            }
-          } {
-            out.close()
-          }
 
-          if (indexFile.exists()) {
-            indexFile.delete()
-          }
+          val offsets = lengths.scanLeft(0L)(_ + _)
+          writeMetadataFile(offsets, indexTmp, indexFile, true)
+
           if (dataFile.exists()) {
             dataFile.delete()
           }
-          if (!indexTmp.renameTo(indexFile)) {
-            throw new IOException("fail to rename file " + indexTmp + " to " + 
indexFile)
-          }
           if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
             throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
           }
+
+          // write the checksum file
+          checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, 
checksumFile) =>
+            try {
+              writeMetadataFile(checksums, checksumTmp, checksumFile, false)
+            } catch {
+              case e: Exception =>
+                // It's not worthwhile to fail here after index file and data 
file are
+                // already successfully stored since checksum is only a 
best-effort for
+                // the corner error case.
+                logError("Failed to write checksum file", e)
+            }
+          }
         }
       }
     } finally {
       logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", 
"]")}")
       if (indexTmp.exists() && !indexTmp.delete()) {
         logError(s"Failed to delete temporary index file at 
${indexTmp.getAbsolutePath}")
       }
+      checksumTmpOpt.foreach { checksumTmp =>
+        if (checksumTmp.exists()) {
+          try {
+            if (!checksumTmp.delete()) {
+              logError(s"Failed to delete temporary checksum file " +
+                s"at ${checksumTmp.getAbsolutePath}")
+            }
+          } catch {
+            case e: Exception =>
+              // Unlike index deletion, we won't propagate the error for the 
checksum file since
+              // checksum is only a best-effort.
+              logError(s"Failed to delete temporary checksum file " +
+                s"at ${checksumTmp.getAbsolutePath}", e)
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Write the metadata file (index or checksum). Metadata values will be 
firstly write into
+   * the tmp file and the tmp file will be renamed to the target file at the 
end to avoid dirty
+   * writes.
+   * @param metaValues The metadata values
+   * @param tmpFile The temp file
+   * @param targetFile The target file
+   * @param propagateError Whether to propagate the error for file operation. 
Unlike index file,
+   *                       checksum is only a best-effort so we won't fail the 
whole task due to
+   *                       the error from checksum.
+   */
+  private def writeMetadataFile(
+      metaValues: Array[Long],
+      tmpFile: File,
+      targetFile: File,
+      propagateError: Boolean): Unit = {
+    val out = new DataOutputStream(
+      new BufferedOutputStream(
+        new FileOutputStream(tmpFile)
+      )
+    )
+    Utils.tryWithSafeFinally {
+      metaValues.foreach(out.writeLong)
+    } {
+      out.close()
+    }
+

Review comment:
       It could have different behavior with `tryWithSafeFinally`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to