Repository: spark
Updated Branches:
  refs/heads/master 6c35865d9 -> 6ec05826d


[SPARK-24107][CORE][FOLLOWUP] ChunkedByteBuffer.writeFully method has not reset 
the limit value

## What changes were proposed in this pull request?

According to the discussion in https://github.com/apache/spark/pull/21175 , 
this PR proposes 2 improvements:
1. add comments to explain why we call `limit` to write out `ByteBuffer` with 
slices.
2. remove the `try ... finally`

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #21327 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ec05826
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ec05826
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ec05826

Branch: refs/heads/master
Commit: 6ec05826d7b0a512847e2522564e01256c8d192d
Parents: 6c35865
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu May 17 20:42:40 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu May 17 20:42:40 2018 +0800

----------------------------------------------------------------------
 .../spark/util/io/ChunkedByteBuffer.scala       | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ec05826/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 3ae8dfc..700ce56 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -63,15 +63,19 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
    */
   def writeFully(channel: WritableByteChannel): Unit = {
     for (bytes <- getChunks()) {
-      val curChunkLimit = bytes.limit()
+      val originalLimit = bytes.limit()
       while (bytes.hasRemaining) {
-        try {
-          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
-          bytes.limit(bytes.position() + ioSize)
-          channel.write(bytes)
-        } finally {
-          bytes.limit(curChunkLimit)
-        }
+        // If `bytes` is an on-heap ByteBuffer, the Java NIO API will copy it 
to a temporary direct
+        // ByteBuffer when writing it out. This temporary direct ByteBuffer is 
cached per thread.
+        // Its size has no limit and can keep growing if it sees a larger 
input ByteBuffer. This may
+        // cause significant native memory leak, if a large direct ByteBuffer 
is allocated and
+        // cached, as it's never released until thread exits. Here we write 
the `bytes` with
+        // fixed-size slices to limit the size of the cached direct ByteBuffer.
+        // Please refer to http://www.evanjones.ca/java-bytebuffer-leak.html 
for more details.
+        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+        bytes.limit(bytes.position() + ioSize)
+        channel.write(bytes)
+        bytes.limit(originalLimit)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to