erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776869729
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
- val diskData = diskStore.getBytes(blockId)
- val iterToReturn: Iterator[Any] = {
- if (level.deserialized) {
- val diskValues = serializerManager.dataDeserializeStream(
- blockId,
- diskData.toInputStream())(info.classTag)
- maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
- } else {
- val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
- .map { _.toInputStream(dispose = false) }
- .getOrElse { diskData.toInputStream() }
- serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ try {
+ val diskData = diskStore.getBytes(blockId)
+ val iterToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskData.toInputStream())(info.classTag)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
+ .map { _.toInputStream(dispose = false) }
+ .getOrElse { diskData.toInputStream() }
+ serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ }
}
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+ releaseLockAndDispose(blockId, diskData, taskContext)
+ })
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } catch {
+ case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+ // We need to have clear error message to catch environmental
problems easily.
+ // Further details:
https://issues.apache.org/jira/browse/SPARK-37710
+ processKryoException(ex, blockId)
+ throw ex
}
- val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
- releaseLockAndDispose(blockId, diskData, taskContext)
- })
- Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
handleLocalReadFailure(blockId)
}
}
}
+ private def processKryoException(ex: KryoException, blockId: BlockId): Unit
= {
+ var errorMessage = s"${ex.getMessage}. Please check if environment status
is healthy " +
+ s"(e.g: disk corruption, network failure (etc)). " +
+ s"${blockManagerId.toString} - blockName: $blockId"
+ if (diskBlockManager.containsBlock(blockId)) {
+ val file = diskBlockManager.getFile(blockId)
+ errorMessage = errorMessage + s" - blockDiskPath:
${file.getAbsolutePath}"
+ }
+ logError(errorMessage)
Review comment:
`logDebug`: `INFO` log level is mostly used at production systems so for
example: in production, if disk corruption occurs on Kryo deserialization
level, `problematic BlockManager, blockId` and `blockPath` information will be
missed with `debug` log level.
`logInfo`: This log message covers `ex.getMessage` and is going to be
printed in only `error` case for `problematic BlockManager, blockId` and
`blockPath` information.
In the light of these, i could not see a good reason to be used `info` or
`debug` level for this use-case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]