spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

2015-11-16 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 505eceef3 -> e12ecfa36


[SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

The code was using the wrong API to add data to the internal composite
buffer, causing buffers to leak in certain situations. Use the right
API and enhance the tests to catch memory leaks.

Also, avoid reusing the composite buffers when downstream handlers keep
references to them; this seems to cause a few different issues even though
the ref counting code seems to be correct, so instead pay the cost of copying
a few bytes when that situation happens.

Author: Marcelo Vanzin 

Closes #9619 from vanzin/SPARK-11617.

(cherry picked from commit 540bf58f18328c68107d6c616ffd70f3a4640054)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e12ecfa3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e12ecfa3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e12ecfa3

Branch: refs/heads/branch-1.6
Commit: e12ecfa360c54add488096493ce0dba15cbf3f8d
Parents: 505ecee
Author: Marcelo Vanzin 
Authored: Mon Nov 16 17:28:11 2015 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 16 17:28:22 2015 -0800

--
 .../network/util/TransportFrameDecoder.java |  47 --
 .../util/TransportFrameDecoderSuite.java| 145 +++
 2 files changed, 151 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
index 272ea84..5889562 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
@@ -56,32 +56,43 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
   buffer = in.alloc().compositeBuffer();
 }
 
-buffer.writeBytes(in);
+buffer.addComponent(in).writerIndex(buffer.writerIndex() + 
in.readableBytes());
 
 while (buffer.isReadable()) {
-  feedInterceptor();
-  if (interceptor != null) {
-continue;
-  }
+  discardReadBytes();
+  if (!feedInterceptor()) {
+ByteBuf frame = decodeNext();
+if (frame == null) {
+  break;
+}
 
-  ByteBuf frame = decodeNext();
-  if (frame != null) {
 ctx.fireChannelRead(frame);
-  } else {
-break;
   }
 }
 
-// We can't discard read sub-buffers if there are other references to the 
buffer (e.g.
-// through slices used for framing). This assumes that code that retains 
references
-// will call retain() from the thread that called "fireChannelRead()" 
above, otherwise
-// ref counting will go awry.
-if (buffer != null && buffer.refCnt() == 1) {
+discardReadBytes();
+  }
+
+  private void discardReadBytes() {
+// If the buffer's been retained by downstream code, then make a copy of 
the remaining
+// bytes into a new buffer. Otherwise, just discard stale components.
+if (buffer.refCnt() > 1) {
+  CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer();
+
+  if (buffer.readableBytes() > 0) {
+ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes());
+spillBuf.writeBytes(buffer);
+newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes());
+  }
+
+  buffer.release();
+  buffer = newBuffer;
+} else {
   buffer.discardReadComponents();
 }
   }
 
-  protected ByteBuf decodeNext() throws Exception {
+  private ByteBuf decodeNext() throws Exception {
 if (buffer.readableBytes() < LENGTH_SIZE) {
   return null;
 }
@@ -127,10 +138,14 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
 this.interceptor = interceptor;
   }
 
-  private void feedInterceptor() throws Exception {
+  /**
+   * @return Whether the interceptor is still active after processing the data.
+   */
+  private boolean feedInterceptor() throws Exception {
 if (interceptor != null && !interceptor.handle(buffer)) {
   interceptor = null;
 }
+return interceptor != null;
   }
 
   public static interface Interceptor {

http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
--
diff --git 
a/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecod

spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

2015-11-16 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 1c5475f14 -> 540bf58f1


[SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

The code was using the wrong API to add data to the internal composite
buffer, causing buffers to leak in certain situations. Use the right
API and enhance the tests to catch memory leaks.

Also, avoid reusing the composite buffers when downstream handlers keep
references to them; this seems to cause a few different issues even though
the ref counting code seems to be correct, so instead pay the cost of copying
a few bytes when that situation happens.

Author: Marcelo Vanzin 

Closes #9619 from vanzin/SPARK-11617.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bf58f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bf58f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bf58f

Branch: refs/heads/master
Commit: 540bf58f18328c68107d6c616ffd70f3a4640054
Parents: 1c5475f
Author: Marcelo Vanzin 
Authored: Mon Nov 16 17:28:11 2015 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 16 17:28:11 2015 -0800

--
 .../network/util/TransportFrameDecoder.java |  47 --
 .../util/TransportFrameDecoderSuite.java| 145 +++
 2 files changed, 151 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
index 272ea84..5889562 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
@@ -56,32 +56,43 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
   buffer = in.alloc().compositeBuffer();
 }
 
-buffer.writeBytes(in);
+buffer.addComponent(in).writerIndex(buffer.writerIndex() + 
in.readableBytes());
 
 while (buffer.isReadable()) {
-  feedInterceptor();
-  if (interceptor != null) {
-continue;
-  }
+  discardReadBytes();
+  if (!feedInterceptor()) {
+ByteBuf frame = decodeNext();
+if (frame == null) {
+  break;
+}
 
-  ByteBuf frame = decodeNext();
-  if (frame != null) {
 ctx.fireChannelRead(frame);
-  } else {
-break;
   }
 }
 
-// We can't discard read sub-buffers if there are other references to the 
buffer (e.g.
-// through slices used for framing). This assumes that code that retains 
references
-// will call retain() from the thread that called "fireChannelRead()" 
above, otherwise
-// ref counting will go awry.
-if (buffer != null && buffer.refCnt() == 1) {
+discardReadBytes();
+  }
+
+  private void discardReadBytes() {
+// If the buffer's been retained by downstream code, then make a copy of 
the remaining
+// bytes into a new buffer. Otherwise, just discard stale components.
+if (buffer.refCnt() > 1) {
+  CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer();
+
+  if (buffer.readableBytes() > 0) {
+ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes());
+spillBuf.writeBytes(buffer);
+newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes());
+  }
+
+  buffer.release();
+  buffer = newBuffer;
+} else {
   buffer.discardReadComponents();
 }
   }
 
-  protected ByteBuf decodeNext() throws Exception {
+  private ByteBuf decodeNext() throws Exception {
 if (buffer.readableBytes() < LENGTH_SIZE) {
   return null;
 }
@@ -127,10 +138,14 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
 this.interceptor = interceptor;
   }
 
-  private void feedInterceptor() throws Exception {
+  /**
+   * @return Whether the interceptor is still active after processing the data.
+   */
+  private boolean feedInterceptor() throws Exception {
 if (interceptor != null && !interceptor.handle(buffer)) {
   interceptor = null;
 }
+return interceptor != null;
   }
 
   public static interface Interceptor {

http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
--
diff --git 
a/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
 
b/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
i