Repository: spark Updated Branches: refs/heads/master cb368f2c2 -> 59741887e
[SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion to a ChunkedByteBuffer ## What changes were proposed in this pull request? In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling `getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`. Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can just get this `ManagedBuffer` and use its `InputStream`. When reading a remote cache block from disk, this reduces heap memory usage significantly. Retain `getRemoteBytes` for other callers. ## How was this patch tested? Imran Rashid wrote an application (https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala), that among other things, tests reading remote cache blocks. I ran this application, using 2500MB blocks, to test reading a cache block on disk. Without this change, with `--executor-memory 5g`, the test fails with `java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes with `--executor-memory 2g`. I also ran the unit tests in core. In particular, `DistributedSuite` has a set of tests that exercise the `getRemoteValues` code path. `BlockManagerSuite` has several tests that call `getRemoteBytes`; I left these unchanged, so `getRemoteBytes` still gets exercised. Closes #23058 from wypoon/SPARK-25905. Authored-by: Wing Yew Poon <wyp...@cloudera.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59741887 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59741887 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59741887 Branch: refs/heads/master Commit: 59741887e272be92ebd6e61783f99f7d8fc05456 Parents: cb368f2 Author: Wing Yew Poon <wyp...@cloudera.com> Authored: Thu Nov 29 14:56:34 2018 -0600 Committer: Imran Rashid <iras...@cloudera.com> Committed: Thu Nov 29 14:56:34 2018 -0600 ---------------------------------------------------------------------- .../org/apache/spark/storage/BlockManager.scala | 43 ++++++++++++-------- .../spark/util/io/ChunkedByteBuffer.scala | 2 - .../org/apache/spark/DistributedSuite.scala | 2 +- 3 files changed, 28 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/59741887/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1b61729..1dfbc6e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -692,9 +692,9 @@ private[spark] class BlockManager( */ private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] - getRemoteBytes(blockId).map { data => + getRemoteManagedBuffer(blockId).map { data => val values = - serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) + serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct) new BlockResult(values, DataReadMethod.Network, data.size) } } @@ -717,13 +717,9 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as serialized bytes. + * Get block from remote block managers as a ManagedBuffer. */ - def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { - // TODO SPARK-25905 if we change this method to return the ManagedBuffer, then getRemoteValues - // could just use the inputStream on the temp file, rather than reading the file into memory. - // Until then, replication can cause the process to use too much memory and get killed - // even though we've read the data to disk. + private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 @@ -788,14 +784,13 @@ private[spark] class BlockManager( } if (data != null) { - // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to - // ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if - // new path is stable. - if (remoteReadNioBufferConversion) { - return Some(new ChunkedByteBuffer(data.nioByteBuffer())) - } else { - return Some(ChunkedByteBuffer.fromManagedBuffer(data)) - } + // If the ManagedBuffer is a BlockManagerManagedBuffer, the disposal of the + // byte buffers backing it may need to be handled after reading the bytes. + // In this case, since we just fetched the bytes remotely, we do not have + // a BlockManagerManagedBuffer. The assert here is to ensure that this holds + // true (or the disposal is handled). + assert(!data.isInstanceOf[BlockManagerManagedBuffer]) + return Some(data) } logDebug(s"The value of block $blockId is null") } @@ -804,6 +799,22 @@ private[spark] class BlockManager( } /** + * Get block from remote block managers as serialized bytes. + */ + def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { + getRemoteManagedBuffer(blockId).map { data => + // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to + // ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if + // new path is stable. + if (remoteReadNioBufferConversion) { + new ChunkedByteBuffer(data.nioByteBuffer()) + } else { + ChunkedByteBuffer.fromManagedBuffer(data) + } + } + } + + /** * Get a block from the block manager (either local or remote). * * This acquires a read lock on the block if the block was stored locally and does not acquire http://git-wip-us.apache.org/repos/asf/spark/blob/59741887/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 128d6ff..2c3730d 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -172,8 +172,6 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { private[spark] object ChunkedByteBuffer { - - // TODO SPARK-25905 eliminate this method if we switch BlockManager to getting InputStreams def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = { data match { case f: FileSegmentManagedBuffer => http://git-wip-us.apache.org/repos/asf/spark/blob/59741887/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 629a323..4083b20 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -195,7 +195,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList assert(deserialized === (1 to 100).toList) } - // This will exercise the getRemoteBytes / getRemoteValues code paths: + // This will exercise the getRemoteValues code path: assert(blockIds.flatMap(id => blockManager.get[Int](id).get.data).toSet === (1 to 1000).toSet) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org