erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776895636
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
- val diskData = diskStore.getBytes(blockId)
- val iterToReturn: Iterator[Any] = {
- if (level.deserialized) {
- val diskValues = serializerManager.dataDeserializeStream(
- blockId,
- diskData.toInputStream())(info.classTag)
- maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
- } else {
- val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
- .map { _.toInputStream(dispose = false) }
- .getOrElse { diskData.toInputStream() }
- serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ try {
+ val diskData = diskStore.getBytes(blockId)
+ val iterToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskData.toInputStream())(info.classTag)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
+ .map { _.toInputStream(dispose = false) }
+ .getOrElse { diskData.toInputStream() }
+ serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ }
}
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+ releaseLockAndDispose(blockId, diskData, taskContext)
+ })
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } catch {
+ case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+ // We need to have clear error message to catch environmental
problems easily.
+ // Further details:
https://issues.apache.org/jira/browse/SPARK-37710
+ processKryoException(ex, blockId)
+ throw ex
}
- val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
- releaseLockAndDispose(blockId, diskData, taskContext)
- })
- Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
handleLocalReadFailure(blockId)
}
}
}
+ private def processKryoException(ex: KryoException, blockId: BlockId): Unit
= {
+ var errorMessage = s"${ex.getMessage}. Please check if environment status
is healthy " +
+ s"(e.g: disk corruption, network failure (etc)). " +
+ s"${blockManagerId.toString} - blockName: $blockId"
+ if (diskBlockManager.containsBlock(blockId)) {
+ val file = diskBlockManager.getFile(blockId)
+ errorMessage = errorMessage + s" - blockDiskPath:
${file.getAbsolutePath}"
+ }
+ logError(errorMessage)
Review comment:
Exactly, we can use `error/warn` in this case, however, not `info/debug`.
This kind of problems are retried with `task-attempts` (3 attempts in our
case).
- If I/O issue is `transient`, it can be recovered by next task-attempt.
- If it is not transient, it will fail `task/stage/job/query` (after all
attempts).
I think we can also use `warn` due to current task-retry flow (even if the
first task attempt fails, flow will re-try until retry-limit).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]