mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776912611
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
- val diskData = diskStore.getBytes(blockId)
- val iterToReturn: Iterator[Any] = {
- if (level.deserialized) {
- val diskValues = serializerManager.dataDeserializeStream(
- blockId,
- diskData.toInputStream())(info.classTag)
- maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
- } else {
- val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
- .map { _.toInputStream(dispose = false) }
- .getOrElse { diskData.toInputStream() }
- serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ try {
+ val diskData = diskStore.getBytes(blockId)
+ val iterToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskData.toInputStream())(info.classTag)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
+ .map { _.toInputStream(dispose = false) }
+ .getOrElse { diskData.toInputStream() }
+ serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ }
}
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+ releaseLockAndDispose(blockId, diskData, taskContext)
+ })
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } catch {
+ case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+ // We need to have clear error message to catch environmental
problems easily.
+ // Further details:
https://issues.apache.org/jira/browse/SPARK-37710
+ processKryoException(ex, blockId)
+ throw ex
}
- val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
- releaseLockAndDispose(blockId, diskData, taskContext)
- })
- Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
handleLocalReadFailure(blockId)
}
}
}
+ private def processKryoException(ex: KryoException, blockId: BlockId): Unit
= {
+ var errorMessage = s"${ex.getMessage}. Please check if environment status
is healthy " +
+ s"(e.g: disk corruption, network failure (etc)). " +
+ s"${blockManagerId.toString} - blockName: $blockId"
+ if (diskBlockManager.containsBlock(blockId)) {
+ val file = diskBlockManager.getFile(blockId)
+ errorMessage = errorMessage + s" - blockDiskPath:
${file.getAbsolutePath}"
+ }
+ logError(errorMessage)
Review comment:
The actual failure is logged/handled where the `KryoException` is caught
- this log message is to give additional information to help debug in case
someone wants to understand more on why the `KryoException` was thrown.
As such, this is informational at best and at debug level if it becomes very
verbose - I am err'ing on side of caution that it makes sense to keep this INFO.
For example, we have had very verbose error/warn messages in past during
executor shutdown (deletion of directories/files which cause a bunch of
IOExceptions), which end up with polluting the logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]