mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779278699



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental 
problems easily.
+              // Further details: 
https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit 
= {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status 
is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: 
${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I am not in favor of making this ERROR or WARN - it is not actionable 
for end users, ends up surfacing for a larger number of cases (not just 
unrecoverable system errors), and the actual exception is handled elsewhere 
while this PR giving supporting information to help triage an issue.
   
   I understand that there some specific set of corner cases where this 
information would be useful (like disk corruption, etc) - which is why I am 
fine with leaving it at INFO instead of DEBUG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to