This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new e7b0186 GEODE-6636: Create multiple buffer pools (#4234) e7b0186 is described below commit e7b018623430a959c4edbf856934588dacea7392 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Wed Apr 29 07:22:17 2020 +0200 GEODE-6636: Create multiple buffer pools (#4234) * GEODE-6636: Create multiple buffer pools * GEODE-6636: Remove new alerts * GEODE-6636: Bug fix * GEODE-6636: Update after review * GEODE-6636: Added SMALL, MEDIUM constants * GEODE-6636: Fix non-direct buffer added to direct buffer pool * GEODE-6636: Update after rebase * GEODE-6636: Update after rebase --- .../org/apache/geode/internal/net/BufferPool.java | 172 +++++++++++++++------ .../org/apache/geode/internal/tcp/Connection.java | 2 +- .../apache/geode/internal/net/BufferPoolTest.java | 48 ++++++ .../geode/internal/net/NioPlainEngineTest.java | 4 +- .../geode/internal/net/NioSslEngineTest.java | 4 +- 5 files changed, 179 insertions(+), 51 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java index 0997c6e..c156c2c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java @@ -20,7 +20,9 @@ import java.util.IdentityHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.Assert; +import org.apache.geode.internal.tcp.Connection; public class BufferPool { private final DMStats stats; @@ -41,12 +43,30 @@ public class BufferPool { } /** - * A list of soft references to byte buffers. + * A list of soft references to small byte buffers. */ - private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue = + private final ConcurrentLinkedQueue<BBSoftReference> bufferSmallQueue = new ConcurrentLinkedQueue<>(); /** + * A list of soft references to middle byte buffers. + */ + private final ConcurrentLinkedQueue<BBSoftReference> bufferMiddleQueue = + new ConcurrentLinkedQueue<>(); + + /** + * A list of soft references to large byte buffers. + */ + private final ConcurrentLinkedQueue<BBSoftReference> bufferLargeQueue = + new ConcurrentLinkedQueue<>(); + + private final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE; + + + private final int MEDIUM_BUFFER_SIZE = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE; + + + /** * use direct ByteBuffers instead of heap ByteBuffers for NIO operations */ public static final boolean useDirectBuffers = !Boolean.getBoolean("p2p.nodirectBuffers"); @@ -69,51 +89,18 @@ public class BufferPool { */ private ByteBuffer acquireDirectBuffer(int size, boolean send) { ByteBuffer result; + if (useDirectBuffers) { - IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a - // set - BBSoftReference ref = bufferQueue.poll(); - while (ref != null) { - ByteBuffer bb = ref.getBB(); - if (bb == null) { - // it was garbage collected - int refSize = ref.consumeSize(); - if (refSize > 0) { - if (ref.getSend()) { // fix bug 46773 - stats.incSenderBufferSize(-refSize, true); - } else { - stats.incReceiverBufferSize(-refSize, true); - } - } - } else if (bb.capacity() >= size) { - bb.rewind(); - bb.limit(size); - return bb; - } else { - // wasn't big enough so put it back in the queue - Assert.assertTrue(bufferQueue.offer(ref)); - if (alreadySeen == null) { - alreadySeen = new IdentityHashMap<>(); - } - if (alreadySeen.put(ref, ref) != null) { - // if it returns non-null then we have already seen this item - // so we have worked all the way through the queue once. - // So it is time to give up and allocate a new buffer. - break; - } - } - ref = bufferQueue.poll(); + if (size <= MEDIUM_BUFFER_SIZE) { + return acquirePredefinedFixedBuffer(send, size); + } else { + return acquireLargeBuffer(send, size); } - result = ByteBuffer.allocateDirect(size); } else { // if we are using heap buffers then don't bother with keeping them around result = ByteBuffer.allocate(size); } - if (send) { - stats.incSenderBufferSize(size, useDirectBuffers); - } else { - stats.incReceiverBufferSize(size, useDirectBuffers); - } + updateBufferStats(size, send, false); return result; } @@ -129,6 +116,86 @@ public class BufferPool { return result; } + /** + * Acquire direct buffer with predefined default capacity (4096 or 32768) + */ + private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) { + // set + int defaultSize; + ConcurrentLinkedQueue<BBSoftReference> bufferTempQueue; + ByteBuffer result; + + if (size <= SMALL_BUFFER_SIZE) { + defaultSize = SMALL_BUFFER_SIZE; + bufferTempQueue = bufferSmallQueue; + } else { + defaultSize = MEDIUM_BUFFER_SIZE; + bufferTempQueue = bufferMiddleQueue; + } + + BBSoftReference ref = bufferTempQueue.poll(); + while (ref != null) { + ByteBuffer bb = ref.getBB(); + if (bb == null) { + // it was garbage collected + updateBufferStats(-defaultSize, ref.getSend(), true); + } else { + bb.clear(); + if (defaultSize > size) { + bb.limit(size); + } + return bb; + } + ref = bufferTempQueue.poll(); + } + result = ByteBuffer.allocateDirect(defaultSize); + updateBufferStats(defaultSize, send, true); + if (defaultSize > size) { + result.limit(size); + } + return result; + } + + private ByteBuffer acquireLargeBuffer(boolean send, int size) { + // set + ByteBuffer result; + IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a + // set + BBSoftReference ref = bufferLargeQueue.poll(); + while (ref != null) { + ByteBuffer bb = ref.getBB(); + if (bb == null) { + // it was garbage collected + int refSize = ref.consumeSize(); + if (refSize > 0) { + updateBufferStats(-refSize, ref.getSend(), true); + } + } else if (bb.capacity() >= size) { + bb.clear(); + if (bb.capacity() > size) { + bb.limit(size); + } + return bb; + } else { + // wasn't big enough so put it back in the queue + Assert.assertTrue(bufferLargeQueue.offer(ref)); + if (alreadySeen == null) { + alreadySeen = new IdentityHashMap<>(); + } + if (alreadySeen.put(ref, ref) != null) { + // if it returns non-null then we have already seen this item + // so we have worked all the way through the queue once. + // So it is time to give up and allocate a new buffer. + break; + } + } + ref = bufferLargeQueue.poll(); + } + result = ByteBuffer.allocateDirect(size); + updateBufferStats(size, send, true); + return result; + } + public void releaseSenderBuffer(ByteBuffer bb) { releaseBuffer(bb, true); } @@ -228,13 +295,26 @@ public class BufferPool { private void releaseBuffer(ByteBuffer bb, boolean send) { if (bb.isDirect()) { BBSoftReference bbRef = new BBSoftReference(bb, send); - bufferQueue.offer(bbRef); - } else { - if (send) { - stats.incSenderBufferSize(-bb.capacity(), false); + if (bb.capacity() <= SMALL_BUFFER_SIZE) { + bufferSmallQueue.offer(bbRef); + } else if (bb.capacity() <= MEDIUM_BUFFER_SIZE) { + bufferMiddleQueue.offer(bbRef); } else { - stats.incReceiverBufferSize(-bb.capacity(), false); + bufferLargeQueue.offer(bbRef); } + } else { + updateBufferStats(-bb.capacity(), send, false); + } + } + + /** + * Update buffer stats. + */ + private void updateBufferStats(int size, boolean send, boolean direct) { + if (send) { + stats.incSenderBufferSize(size, direct); + } else { + stats.incReceiverBufferSize(size, direct); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index f8f6932..089eb2e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -116,7 +116,7 @@ public class Connection implements Runnable { * Small buffer used for send socket buffer on receiver connections and receive buffer on sender * connections. */ - static final int SMALL_BUFFER_SIZE = + public static final int SMALL_BUFFER_SIZE = Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096); /** diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java index cc441e4..81382f0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java @@ -113,4 +113,52 @@ public class BufferPoolTest { assertThat(newBuffer.position()).isEqualTo(16384); assertThat(newBuffer.limit()).isEqualTo(newBuffer.capacity()); } + + + @Test + public void checkBufferSizeAfterAllocation() throws Exception { + ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100); + + ByteBuffer newBuffer = + bufferPool.acquireDirectReceiveBuffer(10000); + assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096); + assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768); + + // buffer should be ready to read the same amount of data + assertThat(buffer.position()).isEqualTo(0); + assertThat(buffer.limit()).isEqualTo(100); + assertThat(newBuffer.position()).isEqualTo(0); + assertThat(newBuffer.limit()).isEqualTo(10000); + } + + @Test + public void checkBufferSizeAfterAcquire() throws Exception { + ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100); + + ByteBuffer newBuffer = + bufferPool.acquireDirectReceiveBuffer(10000); + assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096); + assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768); + + assertThat(buffer.position()).isEqualTo(0); + assertThat(buffer.limit()).isEqualTo(100); + assertThat(newBuffer.position()).isEqualTo(0); + assertThat(newBuffer.limit()).isEqualTo(10000); + bufferPool.releaseReceiveBuffer(buffer); + bufferPool.releaseReceiveBuffer(newBuffer); + + buffer = bufferPool.acquireDirectReceiveBuffer(1000); + + newBuffer = + bufferPool.acquireDirectReceiveBuffer(15000); + + assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096); + assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768); + + assertThat(buffer.position()).isEqualTo(0); + assertThat(buffer.limit()).isEqualTo(1000); + assertThat(newBuffer.position()).isEqualTo(0); + assertThat(newBuffer.limit()).isEqualTo(15000); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java index e9785de..b4eae2f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java @@ -63,9 +63,9 @@ public class NioPlainEngineTest { int requestedCapacity = 210; ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer, BufferPool.BufferType.TRACKED_RECEIVER); - verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class)); + verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class)); assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity); - assertThat(result).isNotSameAs(wrappedBuffer); + assertThat(result).isGreaterThanOrEqualTo(wrappedBuffer); // make sure that data was transferred to the new buffer for (int i = 0; i < 10; i++) { assertThat(result.get(i)).isEqualTo(wrappedBuffer.get(i)); diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index 2c9be77..720ef62 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -62,8 +62,8 @@ import org.apache.geode.test.junit.categories.MembershipTest; @Category({MembershipTest.class}) public class NioSslEngineTest { - private static final int netBufferSize = 10000; - private static final int appBufferSize = 20000; + private static final int netBufferSize = 4096; + private static final int appBufferSize = 32768; private SSLEngine mockEngine; private DMStats mockStats;