spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.
Repository: spark Updated Branches: refs/heads/branch-1.6 505eceef3 -> e12ecfa36 [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder. The code was using the wrong API to add data to the internal composite buffer, causing buffers to leak in certain situations. Use the right API and enhance the tests to catch memory leaks. Also, avoid reusing the composite buffers when downstream handlers keep references to them; this seems to cause a few different issues even though the ref counting code seems to be correct, so instead pay the cost of copying a few bytes when that situation happens. Author: Marcelo Vanzin Closes #9619 from vanzin/SPARK-11617. (cherry picked from commit 540bf58f18328c68107d6c616ffd70f3a4640054) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e12ecfa3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e12ecfa3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e12ecfa3 Branch: refs/heads/branch-1.6 Commit: e12ecfa360c54add488096493ce0dba15cbf3f8d Parents: 505ecee Author: Marcelo Vanzin Authored: Mon Nov 16 17:28:11 2015 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 16 17:28:22 2015 -0800 -- .../network/util/TransportFrameDecoder.java | 47 -- .../util/TransportFrameDecoderSuite.java| 145 +++ 2 files changed, 151 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 272ea84..5889562 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -56,32 +56,43 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { buffer = in.alloc().compositeBuffer(); } -buffer.writeBytes(in); +buffer.addComponent(in).writerIndex(buffer.writerIndex() + in.readableBytes()); while (buffer.isReadable()) { - feedInterceptor(); - if (interceptor != null) { -continue; - } + discardReadBytes(); + if (!feedInterceptor()) { +ByteBuf frame = decodeNext(); +if (frame == null) { + break; +} - ByteBuf frame = decodeNext(); - if (frame != null) { ctx.fireChannelRead(frame); - } else { -break; } } -// We can't discard read sub-buffers if there are other references to the buffer (e.g. -// through slices used for framing). This assumes that code that retains references -// will call retain() from the thread that called "fireChannelRead()" above, otherwise -// ref counting will go awry. -if (buffer != null && buffer.refCnt() == 1) { +discardReadBytes(); + } + + private void discardReadBytes() { +// If the buffer's been retained by downstream code, then make a copy of the remaining +// bytes into a new buffer. Otherwise, just discard stale components. +if (buffer.refCnt() > 1) { + CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer(); + + if (buffer.readableBytes() > 0) { +ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes()); +spillBuf.writeBytes(buffer); +newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes()); + } + + buffer.release(); + buffer = newBuffer; +} else { buffer.discardReadComponents(); } } - protected ByteBuf decodeNext() throws Exception { + private ByteBuf decodeNext() throws Exception { if (buffer.readableBytes() < LENGTH_SIZE) { return null; } @@ -127,10 +138,14 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { this.interceptor = interceptor; } - private void feedInterceptor() throws Exception { + /** + * @return Whether the interceptor is still active after processing the data. + */ + private boolean feedInterceptor() throws Exception { if (interceptor != null && !interceptor.handle(buffer)) { interceptor = null; } +return interceptor != null; } public static interface Interceptor { http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java -- diff --git a/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecod
spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.
Repository: spark Updated Branches: refs/heads/master 1c5475f14 -> 540bf58f1 [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder. The code was using the wrong API to add data to the internal composite buffer, causing buffers to leak in certain situations. Use the right API and enhance the tests to catch memory leaks. Also, avoid reusing the composite buffers when downstream handlers keep references to them; this seems to cause a few different issues even though the ref counting code seems to be correct, so instead pay the cost of copying a few bytes when that situation happens. Author: Marcelo Vanzin Closes #9619 from vanzin/SPARK-11617. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bf58f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bf58f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bf58f Branch: refs/heads/master Commit: 540bf58f18328c68107d6c616ffd70f3a4640054 Parents: 1c5475f Author: Marcelo Vanzin Authored: Mon Nov 16 17:28:11 2015 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 16 17:28:11 2015 -0800 -- .../network/util/TransportFrameDecoder.java | 47 -- .../util/TransportFrameDecoderSuite.java| 145 +++ 2 files changed, 151 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 272ea84..5889562 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -56,32 +56,43 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { buffer = in.alloc().compositeBuffer(); } -buffer.writeBytes(in); +buffer.addComponent(in).writerIndex(buffer.writerIndex() + in.readableBytes()); while (buffer.isReadable()) { - feedInterceptor(); - if (interceptor != null) { -continue; - } + discardReadBytes(); + if (!feedInterceptor()) { +ByteBuf frame = decodeNext(); +if (frame == null) { + break; +} - ByteBuf frame = decodeNext(); - if (frame != null) { ctx.fireChannelRead(frame); - } else { -break; } } -// We can't discard read sub-buffers if there are other references to the buffer (e.g. -// through slices used for framing). This assumes that code that retains references -// will call retain() from the thread that called "fireChannelRead()" above, otherwise -// ref counting will go awry. -if (buffer != null && buffer.refCnt() == 1) { +discardReadBytes(); + } + + private void discardReadBytes() { +// If the buffer's been retained by downstream code, then make a copy of the remaining +// bytes into a new buffer. Otherwise, just discard stale components. +if (buffer.refCnt() > 1) { + CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer(); + + if (buffer.readableBytes() > 0) { +ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes()); +spillBuf.writeBytes(buffer); +newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes()); + } + + buffer.release(); + buffer = newBuffer; +} else { buffer.discardReadComponents(); } } - protected ByteBuf decodeNext() throws Exception { + private ByteBuf decodeNext() throws Exception { if (buffer.readableBytes() < LENGTH_SIZE) { return null; } @@ -127,10 +138,14 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { this.interceptor = interceptor; } - private void feedInterceptor() throws Exception { + /** + * @return Whether the interceptor is still active after processing the data. + */ + private boolean feedInterceptor() throws Exception { if (interceptor != null && !interceptor.handle(buffer)) { interceptor = null; } +return interceptor != null; } public static interface Interceptor { http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java -- diff --git a/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java i