This is an automated email from the ASF dual-hosted git repository. ravindra pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 34ff4b4 ARROW-4913:[Java][Memory] Add additional methods for observing allocations. 34ff4b4 is described below commit 34ff4b4d35624d0005a953adf751e4e76537d66b Author: Praveen <prav...@dremio.com> AuthorDate: Wed Apr 3 11:00:22 2019 +0530 ARROW-4913:[Java][Memory] Add additional methods for observing allocations. - Additional methods for observers to calculate total number of live ledgers and buffers. - Allows clients to clamp down on heap growth due to these buffers. Author: Praveen <prav...@dremio.com> Author: Jacques Nadeau <jacq...@apache.org> Closes #4012 from praveenbingo/ARROW-4913 and squashes the following commits: 4671d032 <Praveen> Fix some checkstyle failures. 5af55057 <Jacques Nadeau> ARROW-TBD: Add additional methods to the allocation listener interface for PreAllocation and Release. --- .../apache/arrow/memory/AllocationListener.java | 49 ++++++++++++---------- .../org/apache/arrow/memory/AllocationManager.java | 1 + .../org/apache/arrow/memory/BaseAllocator.java | 5 ++- .../org/apache/arrow/memory/RootAllocator.java | 4 ++ .../org/apache/arrow/memory/TestBaseAllocator.java | 39 ++++++++++++++++- 5 files changed, 73 insertions(+), 25 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java index 4fd5330..fba6a70 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java @@ -25,31 +25,34 @@ package org.apache.arrow.memory; */ public interface AllocationListener { - public static final AllocationListener NOOP = new AllocationListener() { - @Override - public void onAllocation(long size) { - } + public static final AllocationListener NOOP = new AllocationListener() {}; - @Override - public boolean onFailedAllocation(long size, AllocationOutcome outcome) { - return false; - } - - @Override - public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) { - } - - @Override - public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) { - } - }; + /** + * Called each time a new buffer has been requested. + * + * <p>An exception can be safely thrown by this method to terminate the allocation. + * + * @param size the buffer size being allocated + */ + default void onPreAllocation(long size) {} /** - * Called each time a new buffer is allocated. + * Called each time a new buffer has been allocated. + * + * <p>An exception cannot be thrown by this method. * * @param size the buffer size being allocated */ - void onAllocation(long size); + default void onAllocation(long size) {} + + /** + * Informed each time a buffer is released from allocation. + * + * <p>An exception cannot be thrown by this method. + * @param size The size of the buffer being released. + */ + default void onRelease(long size) {} + /** * Called whenever an allocation failed, giving the caller a chance to create some space in the @@ -60,7 +63,9 @@ public interface AllocationListener { * @param outcome the outcome of the failed allocation. Carries information of what failed * @return true, if the allocation can be retried; false if the allocation should fail */ - boolean onFailedAllocation(long size, AllocationOutcome outcome); + default boolean onFailedAllocation(long size, AllocationOutcome outcome) { + return false; + } /** * Called immediately after a child allocator was added to the parent allocator. @@ -68,7 +73,7 @@ public interface AllocationListener { * @param parentAllocator The parent allocator to which a child was added * @param childAllocator The child allocator that was just added */ - void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator); + default void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {} /** * Called immediately after a child allocator was removed from the parent allocator. @@ -76,5 +81,5 @@ public interface AllocationListener { * @param parentAllocator The parent allocator from which a child was removed * @param childAllocator The child allocator that was just removed */ - void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator); + default void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {} } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index c10d246..3a8d465 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -147,6 +147,7 @@ public class AllocationManager { // no one else owns, lets release. oldLedger.allocator.releaseBytes(size); underlying.release(); + oldLedger.allocator.listener.onRelease(size); amDestructionTime = System.nanoTime(); owningLedger = null; } else { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 34613b4..bd84aef 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -49,7 +49,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato final String name; final RootAllocator root; private final Object DEBUG_LOCK = DEBUG ? new Object() : null; - private final AllocationListener listener; + final AllocationListener listener; private final BaseAllocator parentAllocator; private final ArrowByteBufAllocator thisAsByteBufAllocator; private final IdentityHashMap<BaseAllocator, Object> childAllocators; @@ -277,6 +277,9 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato final int actualRequestSize = initialRequestSize < AllocationManager.CHUNK_SIZE ? nextPowerOfTwo(initialRequestSize) : initialRequestSize; + + listener.onPreAllocation(actualRequestSize); + AllocationOutcome outcome = this.allocateBytes(actualRequestSize); if (!outcome.isOk()) { if (listener.onFailedAllocation(actualRequestSize, outcome)) { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index b6fefd7..3023a14 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -26,6 +26,10 @@ import org.apache.arrow.util.VisibleForTesting; */ public class RootAllocator extends BaseAllocator { + public RootAllocator() { + this(AllocationListener.NOOP, Long.MAX_VALUE); + } + public RootAllocator(final long limit) { this(AllocationListener.NOOP, limit); } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index c53eb06..4da7434 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -228,7 +228,9 @@ public class TestBaseAllocator { // It counts the number of times it has been invoked, and how much memory allocation it has seen // When set to 'expand on fail', it attempts to expand the associated allocator's limit private static final class TestAllocationListener implements AllocationListener { + private int numPreCalls; private int numCalls; + private int numReleaseCalls; private int numChildren; private long totalMem; private boolean expandOnFail; @@ -245,6 +247,11 @@ public class TestBaseAllocator { } @Override + public void onPreAllocation(long size) { + numPreCalls++; + } + + @Override public void onAllocation(long size) { numCalls++; totalMem += size; @@ -259,6 +266,12 @@ public class TestBaseAllocator { return false; } + + @Override + public void onRelease(long size) { + numReleaseCalls++; + } + @Override public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) { ++numChildren; @@ -275,6 +288,14 @@ public class TestBaseAllocator { this.expandLimit = expandLimit; } + int getNumPreCalls() { + return numPreCalls; + } + + int getNumReleaseCalls() { + return numReleaseCalls; + } + int getNumCalls() { return numCalls; } @@ -291,11 +312,15 @@ public class TestBaseAllocator { @Test public void testRootAllocator_listeners() throws Exception { TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumPreCalls()); assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getNumReleaseCalls()); assertEquals(0, l1.getNumChildren()); assertEquals(0, l1.getTotalMem()); TestAllocationListener l2 = new TestAllocationListener(); + assertEquals(0, l2.getNumPreCalls()); assertEquals(0, l2.getNumCalls()); + assertEquals(0, l2.getNumReleaseCalls()); assertEquals(0, l2.getNumChildren()); assertEquals(0, l2.getTotalMem()); // root and first-level child share the first listener @@ -305,7 +330,9 @@ public class TestBaseAllocator { assertEquals(1, l1.getNumChildren()); final ArrowBuf buf1 = c1.buffer(16); assertNotNull("allocation failed", buf1); + assertEquals(1, l1.getNumPreCalls()); assertEquals(1, l1.getNumCalls()); + assertEquals(0, l1.getNumReleaseCalls()); assertEquals(16, l1.getTotalMem()); buf1.release(); try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) { @@ -315,7 +342,9 @@ public class TestBaseAllocator { assertNotNull("allocation failed", buf2); assertEquals(1, l1.getNumCalls()); assertEquals(16, l1.getTotalMem()); + assertEquals(1, l2.getNumPreCalls()); assertEquals(1, l2.getNumCalls()); + assertEquals(0, l2.getNumReleaseCalls()); assertEquals(32, l2.getTotalMem()); buf2.release(); try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) { @@ -323,9 +352,13 @@ public class TestBaseAllocator { assertEquals(1, l2.getNumChildren()); final ArrowBuf buf3 = c3.buffer(64); assertNotNull("allocation failed", buf3); + assertEquals(1, l1.getNumPreCalls()); assertEquals(1, l1.getNumCalls()); + assertEquals(1, l1.getNumReleaseCalls()); assertEquals(16, l1.getTotalMem()); + assertEquals(2, l2.getNumPreCalls()); assertEquals(2, l2.getNumCalls()); + assertEquals(1, l2.getNumReleaseCalls()); assertEquals(32 + 64, l2.getTotalMem()); buf3.release(); } @@ -336,6 +369,8 @@ public class TestBaseAllocator { assertEquals(0, l2.getNumChildren()); } assertEquals(0, l1.getNumChildren()); // first-level child removed + + assertEquals(2, l2.getNumReleaseCalls()); } } @@ -505,7 +540,7 @@ public class TestBaseAllocator { assertEquals(0, arrowBuf.writerIndex()); assertEquals(256, arrowBuf.writableBytes()); - final ArrowBuf slice3 = (ArrowBuf) arrowBuf.slice(); + final ArrowBuf slice3 = arrowBuf.slice(); assertEquals(0, slice3.readerIndex()); assertEquals(0, slice3.readableBytes()); assertEquals(0, slice3.writerIndex()); @@ -520,7 +555,7 @@ public class TestBaseAllocator { assertEquals(256, arrowBuf.writerIndex()); assertEquals(0, arrowBuf.writableBytes()); - final ArrowBuf slice1 = (ArrowBuf) arrowBuf.slice(); + final ArrowBuf slice1 = arrowBuf.slice(); assertEquals(0, slice1.readerIndex()); assertEquals(256, slice1.readableBytes()); for (int i = 0; i < 10; ++i) {