erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775643254



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,
+    taskContext: Option[TaskContext]): Option[BlockResult] = {
+    try {
+      val level = info.level
+      val diskData = diskStore.getBytes(blockId)
+      val iterToReturn: Iterator[Any] = {
+        if (level.deserialized) {
+          val diskValues = serializerManager.dataDeserializeStream(
+            blockId,
+            diskData.toInputStream())(info.classTag)
+          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+        } else {
+          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
+            .map { _.toInputStream(dispose = false) }
+            .getOrElse { diskData.toInputStream() }
+          serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
+        }
+      }
+      val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+        releaseLockAndDispose(blockId, diskData, taskContext)
+      })
+      Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+    } catch {
+      case ex: KryoException
+        // We need to have clear error message to catch environmental problems 
easily.
+        // Further details: https://jira2.workday.com/browse/PRISM-102331
+        if ex.getMessage.toLowerCase(Locale.ROOT)
+          .contains("java.io.ioexception: input/output error") => {
+        processKryoException(ex, blockId)
+        throw ex
+      }
+    }
+  }
+
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit 
= {
+    val errorMessage =
+      new StringBuffer(s"${ex.getMessage} usually occurs due to environmental 
problems " +

Review comment:
       In the light of `IOException` match, i also updated error message. In my 
use-case, 3 attempts of same task had failed on the same host' s corrupted disk 
with `java.io.IOException: Input/output error` so as much as possible, i would 
like to cover disk corruption as an example of potential root-causes if 
`KryoException` throws with `IOException` cause. (If that make sense)
   **New error message:**
   ```
   java.io.IOException: Input/output error. Please check if environment status 
is healthy 
   for disk corruption, network failure (etc). BlockManagerId(driver, 
localhost, 56816, None)
    - blockName: test_my-block-id - blockDiskPath: 
/private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pm0000gq/T
   /blockmgr-89221407-59ca-453b-aa7f-82117a460174/11/test_my-block-id
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to