mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776647028
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
- val diskData = diskStore.getBytes(blockId)
- val iterToReturn: Iterator[Any] = {
- if (level.deserialized) {
- val diskValues = serializerManager.dataDeserializeStream(
- blockId,
- diskData.toInputStream())(info.classTag)
- maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
- } else {
- val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
- .map { _.toInputStream(dispose = false) }
- .getOrElse { diskData.toInputStream() }
- serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ try {
+ val diskData = diskStore.getBytes(blockId)
+ val iterToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskData.toInputStream())(info.classTag)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskData)
+ .map { _.toInputStream(dispose = false) }
+ .getOrElse { diskData.toInputStream() }
+ serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
+ }
}
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+ releaseLockAndDispose(blockId, diskData, taskContext)
+ })
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } catch {
+ case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+ // We need to have clear error message to catch environmental
problems easily.
+ // Further details:
https://issues.apache.org/jira/browse/SPARK-37710
+ processKryoException(ex, blockId)
+ throw ex
}
- val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
- releaseLockAndDispose(blockId, diskData, taskContext)
- })
- Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
handleLocalReadFailure(blockId)
}
}
}
+ private def processKryoException(ex: KryoException, blockId: BlockId): Unit
= {
+ var errorMessage = s"${ex.getMessage}. Please check if environment status
is healthy for " +
+ s"disk corruption, network failure (etc). ${blockManagerId.toString} -
blockName: $blockId"
+ if (diskBlockManager.containsBlock(blockId)) {
+ val file = diskBlockManager.getFile(blockId)
+ errorMessage = errorMessage + s" - blockDiskPath:
${file.getAbsolutePath}"
+ }
+ logError(errorMessage, ex)
Review comment:
To clarify - log exception + rethrow is an antipattern.
I am fine with either augmenting the thrown exception with additional
details (typically the better way to handle this), or logging additional
details (like blockId, etc) and throwing original exception to be appropriately
handled - avoiding duplicate stacktraces across log file.
The latest change addresses this part, so resolving the conversation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]