viirya commented on code in PR #42724:
URL: https://github.com/apache/spark/pull/42724#discussion_r1322402242
##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala:
##########
@@ -84,24 +87,46 @@ object KubernetesLocalDiskShuffleExecutorComponents extends
Logging {
.flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx
.flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx
.flatMap(_.listFiles).filter(_.isDirectory) // 00
- .flatMap(_.listFiles).filterNot(_.getName.contains(".checksum"))
+ .flatMap(_.listFiles)
if (files != null) files.toSeq else Seq.empty
}
+ .partition(_.getName.contains(".checksum"))
+ val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
- logInfo(s"Found ${files.size} files")
+ logInfo(s"Found ${dataFiles.size} data files, ${indexFiles.size} index
files, " +
+ s"and ${checksumFiles.size} checksum files.")
+
+ // Build hashmaps for faster access with data file name as a key
+ val checksumFileMap = new mutable.HashMap[String, File]()
+ val algorithm = conf.get(SHUFFLE_CHECKSUM_ALGORITHM)
+ checksumFiles.foreach { f =>
+ checksumFileMap.put(f.getName.replace(".checksum." + algorithm,
".data"), f)
+ }
+ val indexFileMap = new mutable.HashMap[String, File]()
+ indexFiles.foreach { f =>
+ indexFileMap.put(f.getName.replace(".index", ".data"), f)
+ }
// This is not used.
val classTag = implicitly[ClassTag[Object]]
val level = StorageLevel.DISK_ONLY
- val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
+ val checksumDisabled = !conf.get(SHUFFLE_CHECKSUM_ENABLED)
(dataFiles ++ indexFiles).foreach { f =>
logInfo(s"Try to recover ${f.getAbsolutePath}")
try {
val id = BlockId(f.getName)
// To make it sure to handle only shuffle blocks
if (id.isShuffle) {
- val decryptedSize = f.length()
- bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f,
decryptedSize).save()
+ // For index files, skipVerification is true and checksumFile and
indexFile are ignored.
+ val skipVerification = checksumDisabled ||
f.getName.endsWith(".index")
+ val checksumFile = checksumFileMap.getOrElse(f.getName, null)
+ val indexFile = indexFileMap.getOrElse(f.getName, null)
+ if (skipVerification || verifyChecksum(algorithm, id, checksumFile,
indexFile, f)) {
+ val decryptedSize = f.length()
+ bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f,
decryptedSize).save()
+ } else {
+ logInfo(s"Ignore ${f.getAbsolutePath} due to the verification
failure.")
+ }
Review Comment:
> If there is a suspicious part, we discard those data while keeping
recovering the other shuffle data files. If we discard, it will be recomputed
by Apache Spark automatically like the case we didn't remount this PVC at all.
So, there is no issue by discarding those suspicious data when checksum
failures happen.
Oh okay. If, for example, one shuffle file is not recovered, will there be a
fetch failure exception like normal shuffle fetch case and that triggers to
recompute? Is this what you mean?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]