dongjoon-hyun commented on code in PR #42724:
URL: https://github.com/apache/spark/pull/42724#discussion_r1322382289


##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala:
##########
@@ -84,24 +87,46 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
           .flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx
           .flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx
           .flatMap(_.listFiles).filter(_.isDirectory) // 00
-          .flatMap(_.listFiles).filterNot(_.getName.contains(".checksum"))
+          .flatMap(_.listFiles)
         if (files != null) files.toSeq else Seq.empty
       }
+      .partition(_.getName.contains(".checksum"))
+    val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
 
-    logInfo(s"Found ${files.size} files")
+    logInfo(s"Found ${dataFiles.size} data files, ${indexFiles.size} index 
files, " +
+        s"and ${checksumFiles.size} checksum files.")
+
+    // Build hashmaps for faster access with data file name as a key
+    val checksumFileMap = new mutable.HashMap[String, File]()
+    val algorithm = conf.get(SHUFFLE_CHECKSUM_ALGORITHM)
+    checksumFiles.foreach { f =>
+      checksumFileMap.put(f.getName.replace(".checksum." + algorithm, 
".data"), f)
+    }
+    val indexFileMap = new mutable.HashMap[String, File]()
+    indexFiles.foreach { f =>
+      indexFileMap.put(f.getName.replace(".index", ".data"), f)
+    }
 
     // This is not used.
     val classTag = implicitly[ClassTag[Object]]
     val level = StorageLevel.DISK_ONLY
-    val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
+    val checksumDisabled = !conf.get(SHUFFLE_CHECKSUM_ENABLED)
     (dataFiles ++ indexFiles).foreach { f =>
       logInfo(s"Try to recover ${f.getAbsolutePath}")
       try {
         val id = BlockId(f.getName)
         // To make it sure to handle only shuffle blocks
         if (id.isShuffle) {
-          val decryptedSize = f.length()
-          bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
+          // For index files, skipVerification is true and checksumFile and 
indexFile are ignored.
+          val skipVerification = checksumDisabled || 
f.getName.endsWith(".index")
+          val checksumFile = checksumFileMap.getOrElse(f.getName, null)
+          val indexFile = indexFileMap.getOrElse(f.getName, null)
+          if (skipVerification || verifyChecksum(algorithm, id, checksumFile, 
indexFile, f)) {
+            val decryptedSize = f.length()
+            bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
+          } else {
+            logInfo(s"Ignore ${f.getAbsolutePath} due to the verification 
failure.")
+          }

Review Comment:
   Ur, it seems that you assume that this PR should be a on-off switch for the 
whole physical volume health-check. Ya, it could be. However, it's not the goal 
for now. The main request I received was to detect the potential OS corruptions 
where some unflushed files are visible accidentally during detaching and 
re-attaching.
   
   Apache Spark shuffle data are only meaningful inside the relationship of 
`index file`, `data file`, and `checksum file`. And, `checksum` is designed as 
an auxiliary file for additional analysis instead of mandatory files.
   
   So, the goal of this PR is simple in the context of data recovery.
   - If there is a checksum, we try to verify the data once more to make it 
sure there is no corruption because this is a remounted data. If it passes, we 
are good to use the verified data.
   - If there is a suspicious part, we discard those data while keeping 
recovering the other shuffle data. If we discard, it will be recomputed by 
Apache Spark like the case we didn't remount this PVC. So, there is no issue by 
discard those  data when checksum failures happen.
   
   Just to give a more background, although we assume EBS or EFS in the 
community, there are many K8s PV implementations and some of them are new and 
proprietary implementations which we don't know what is going on. This is 
proposed as a moderated protected mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to