erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779052285



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental 
problems easily.
+              // Further details: 
https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit 
= {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status 
is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: 
${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I think the main goal of this patch is to provide `problematic 
BlockManager`, `blockId` and `blockPath` (in addition `error` message) when 
specially `disk problems` occurred on `prod` systems (using `INFO` log level).
   For example, our use-case was `disk corruption` as a real-life example and 
if `problematic BlockManager`, `blockId` and `blockPath` are logged at DEBUG 
level where prod systems mostly run at INFO log-level, these hints will be 
missed on prod logs. On the other hand, it is hard to catch this kind of 
environmental problems on prod systems and also, hard to reproduce on test 
environments (unless additional test pipeline is built to simulate network 
conditions, e.g: `toxiproxy`).
   
   I updated this log message level from ERROR to WARN and i think it is useful 
to be logged explicitly on prod systems if environmental problems (e.g: disk 
corruption) are occurred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to