Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r670100864
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver(
// Another attempt for the same task has already written our map
outputs successfully,
// so just use the existing partition lengths and delete our
temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+ if (checksumEnabled) {
+ val existingChecksums = getChecksums(checksumFileOpt.get,
checksums.length)
+ if (existingChecksums != null) {
+ System.arraycopy(existingChecksums, 0, checksums, 0,
lengths.length)
+ } else {
+ // It's possible that the previous task attempt succeeded
writing the
+ // index file and data file but failed to write the checksum
file. In
+ // this case, the current task attempt could write the missing
checksum
+ // file by itself.
+ writeMetadataFile(checksums, checksumTmpOpt.get,
checksumFileOpt.get, false)
+ }
+ }
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
} else {
// This is the first successful attempt in writing the map outputs
for this task,
// so override any existing index and data files with the ones we
wrote.
- val out = new DataOutputStream(new BufferedOutputStream(new
FileOutputStream(indexTmp)))
- Utils.tryWithSafeFinally {
- // We take in lengths of each block, need to convert it to offsets.
- var offset = 0L
- out.writeLong(offset)
- for (length <- lengths) {
- offset += length
- out.writeLong(offset)
- }
- } {
- out.close()
- }
- if (indexFile.exists()) {
- indexFile.delete()
- }
+ val offsets = lengths.scanLeft(0L)(_ + _)
+ writeMetadataFile(offsets, indexTmp, indexFile, true)
+
if (dataFile.exists()) {
dataFile.delete()
}
- if (!indexTmp.renameTo(indexFile)) {
- throw new IOException("fail to rename file " + indexTmp + " to " +
indexFile)
- }
if (dataTmp != null && dataTmp.exists() &&
!dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " +
dataFile)
}
+
+ // write the checksum file
+ checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp,
checksumFile) =>
+ try {
+ writeMetadataFile(checksums, checksumTmp, checksumFile, false)
+ } catch {
+ case e: Exception =>
+ // It's not worthwhile to fail here after index file and data
file are
+ // already successfully stored since checksum is only a
best-effort for
+ // the corner error case.
+ logError("Failed to write checksum file", e)
+ }
+ }
}
}
} finally {
logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",",
"]")}")
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at
${indexTmp.getAbsolutePath}")
}
+ checksumTmpOpt.foreach { checksumTmp =>
+ if (checksumTmp.exists()) {
+ try {
+ if (!checksumTmp.delete()) {
+ logError(s"Failed to delete temporary checksum file " +
+ s"at ${checksumTmp.getAbsolutePath}")
+ }
+ } catch {
+ case e: Exception =>
+ // Unlike index deletion, we won't propagate the error for the
checksum file since
+ // checksum is only a best-effort.
+ logError(s"Failed to delete temporary checksum file " +
+ s"at ${checksumTmp.getAbsolutePath}", e)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Write the metadata file (index or checksum). Metadata values will be
firstly write into
+ * the tmp file and the tmp file will be renamed to the target file at the
end to avoid dirty
+ * writes.
+ * @param metaValues The metadata values
+ * @param tmpFile The temp file
+ * @param targetFile The target file
+ * @param propagateError Whether to propagate the error for file operation.
Unlike index file,
+ * checksum is only a best-effort so we won't fail the
whole task due to
+ * the error from checksum.
+ */
+ private def writeMetadataFile(
+ metaValues: Array[Long],
+ tmpFile: File,
+ targetFile: File,
+ propagateError: Boolean): Unit = {
+ val out = new DataOutputStream(
+ new BufferedOutputStream(
+ new FileOutputStream(tmpFile)
+ )
+ )
+ Utils.tryWithSafeFinally {
+ metaValues.foreach(out.writeLong)
+ } {
+ out.close()
+ }
+
Review comment:
It could have different behavior with `tryWithSafeFinally`? (Note
`tryWithSafeFinally` comes from the original code.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]