[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4485 ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140271306 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws IOException { assertEquals(2, first.getNumBuffers()); assertEquals(2, second.getNumBuffers()); - String msg = "Wrong number of available segments after create buffer pool and request segments."; + String msg = "Wrong number of available segments after creating buffer pool and requesting segments."; --- End diff -- still "buffer pool**s**" ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140190096 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -264,11 +271,13 @@ public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, Inte assertNotNull(buffer); } - // if requestMemorySegments() blocks, this will make sure that enough buffers are freed - // eventually for it to continue + // requestMemorySegments() below will and wait for buffers --- End diff -- typo from my code example: should be "below will wait for buffers" ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140184098 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -150,7 +150,7 @@ public void testUniformDistributionBounded3() throws IOException { assertEquals(1, third.getNumBuffers()); // similar to #verifyAllBuffersReturned() - String msg = "Did not return all buffers to network buffer pool after test."; + String msg = "Wrong number of available segments after create buffer pools."; --- End diff -- "after creating buffer pools" ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140184571 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws IOException { assertEquals(2, first.getNumBuffers()); assertEquals(2, second.getNumBuffers()); - String msg = "Did not return all buffers to network buffer pool after test."; + String msg = "Wrong number of available segments after create buffer pool and request segments."; --- End diff -- "creating buffer pools and requesting segments" ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140189504 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -188,14 +195,14 @@ public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List memorySegments = Collections.emptyList(); try { - memorySegments = globalPool.requestMemorySegments(numBuffers / 2); - + List memorySegments = globalPool.requestMemorySegments(numBuffers / 2); assertEquals(memorySegments.size(), numBuffers / 2); - } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } finally { --- End diff -- Actually, in case one of the assertions fails, we also need to recycle the requested memory segments since they are not in the pool anymore. How about this? ``` List memorySegments = Collections.emptyList(); try { memorySegments = globalPool.requestMemorySegments(numBuffers / 2); assertEquals(memorySegments.size(), numBuffers / 2); globalPool.recycleMemorySegments(memorySegments); memorySegments.clear(); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } finally { globalPool.recycleMemorySegments(memorySegments); // just in case globalPool.destroy(); } ``` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140187056 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); + + globalPool.recycleMemorySegments(segmentList1); + assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList2); + assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList3); + assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(5, first.getNumBuffers()); + assertEquals(5, second.getNumBuffers()); + + globalPool.destroy(); --- End diff -- Oh yes, you're right. I guess we were lucky after all since (after quickly scanning over the class) it does not seem that any buffers are requested from the created pools. The `verifyAllBuffersReturned()` method verifies that by checking `networkBufferPool.getNumberOfAvailableMemorySegments()` (as did you) but in case of failures, the buffer pools are not cleaned up either. Maybe we should put the cleanup into the `finally` block of a surrounding try-catch as the following (and similar in your new test)? ``` @After public void verifyAllBuffersReturned() { String msg = "Did not return all buffers to network buffer pool after test."; try { assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments()); } finally { // in case buffers have actually been requested, we must release them again networkBufferPool.destroyAllBufferPools(); networkBufferPool.destroy(); } } ``` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139083671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -333,7 +333,7 @@ public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE InputChannel current = inputChannels.get(partitionId); - if (current.getClass() == UnknownInputChannel.class) { + if (current instanceof UnknownInputChannel) { --- End diff -- agree ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139083473 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + } + + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently not containing the number of required free segments (currently occupied by a buffer pool). +*/ + @Test + public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { + final int numBuffers = 10; +
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139080305 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); --- End diff -- yes ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139080289 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); --- End diff -- yes ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139078792 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - final List segments = new ArrayList<>(numRequiredBuffers); - for (int i = 0 ; i < numRequiredBuffers ; i++) { - segments.add(availableMemorySegments.poll()); - } + redistributeBuffers(); + } + final List segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { try { - redistributeBuffers(); - } catch (IOException e) { - if (segments.size() > 0) { - recycleMemorySegments(segments); - } - + segments.add(availableMemorySegments.take()); --- End diff -- Yes, I think this way seems better than always blocking to wait. It is very nice of you to give the specific test. ð ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139076168 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); + + globalPool.recycleMemorySegments(segmentList1); + assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList2); + assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList3); + assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(5, first.getNumBuffers()); + assertEquals(5, second.getNumBuffers()); + + globalPool.destroy(); --- End diff -- The other tests in `BufferPoolFactoryTest` also has this issue. I will add `destroyAllBufferPools()` in this new test. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r139075946 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); --- End diff -- I referred to the previous existing test `testUniformDistributionBounded3` and I think it has the same issue. I will modify both of them. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138891582 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + } + + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently not containing the number of required free segments (currently occupied by a buffer pool). +*/ + @Test + public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { + final int numBuffers = 10; + +
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138889161 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } --- End diff -- add `finally`with `globalPool.destroy()` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13882 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); --- End diff -- here, you should also destroy the `globalPool`, i.e. call `globalPool.destroy()` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138879343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - final List segments = new ArrayList<>(numRequiredBuffers); - for (int i = 0 ; i < numRequiredBuffers ; i++) { - segments.add(availableMemorySegments.poll()); - } + redistributeBuffers(); + } + final List segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { try { - redistributeBuffers(); - } catch (IOException e) { - if (segments.size() > 0) { - recycleMemorySegments(segments); - } - + segments.add(availableMemorySegments.take()); --- End diff -- I know, I was the one who suggested it, but thinking about the blocking `take()` a bit more and with some more background I acquired over the last weeks, I'm getting the feeling, we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting forever, we may at least be stopped by the `destroy()` function being called. Or what do you think? I'm thinking about something like this: ``` final ArrayList segments = new ArrayList<>(numRequiredBuffers); try { while (segments.size() < numRequiredBuffers) { if (isDestroyed) { throw new IllegalStateException("Buffer pool is destroyed."); } final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS); if (segment != null) { segments.add(segment); } } } catch (Throwable e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } ``` (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`) The following test (for `NetworkBufferPoolTest`) could verify this behaviour: ``` @Rule public ExpectedException expectedException = ExpectedException.none(); /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in * case of a concurrent {@link NetworkBufferPool#destroy()} call. */ @Test public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); MemorySegment segment = globalPool.requestMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); CheckedThread asyncRequest = new CheckedThread() { @Override public void go() throws Exception { isRunning.trigger(); globalPool.requestMemorySegments(10); } }; asyncRequest.start(); // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments() // call above. We cannot guarantee this though but make it highly probable: isRunning.await(); Thread.sleep(10); globalPool.destroy(); segment.free(); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("destroyed"); asyncRequest.sync(); } ``` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13514 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); --- End diff -- unnecessary check - see above ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138897087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -333,7 +333,7 @@ public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE InputChannel current = inputChannels.get(partitionId); - if (current.getClass() == UnknownInputChannel.class) { + if (current instanceof UnknownInputChannel) { --- End diff -- Just to be on the safe side, you should also change this check in `#setInputChannel()` above. This way, we handle all sub-classes of `UnknownInputChannel` the same way as `UnknownInputChannel` itself ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138889183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } --- End diff -- add `finally`with `globalPool.destroy()` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138852758 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); --- End diff -- for this test, `msg` is wrong as nothing has been recycled here (yet) ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13440 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); --- End diff -- If there was an exception, `memorySegments` will _always_ be the `Collections.emptyList()` you set before, so there's no need to check for its size. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138853431 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); + + globalPool.recycleMemorySegments(segmentList1); + assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList2); + assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList3); + assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(5, first.getNumBuffers()); + assertEquals(5, second.getNumBuffers()); + + globalPool.destroy(); --- End diff -- you also need to call `NetworkBufferPool#destroyAllBufferPools()` or `LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release their buffers ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136904188 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java --- @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception { } } + /** +* Tests that input gate requests and assigns network buffers for remote input channel, and triggers +* this process after unknown input channel updates to remote input channel. +*/ + @Test + public void testRequestBuffersForInputChannel() throws Exception { + final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup(); + final SingleInputGate inputGate = new SingleInputGate( + "t1", + new JobID(), + new IntermediateDataSetID(), + ResultPartitionType.PIPELINED_CREDIT_BASED, + 0, + 1, + mock(TaskActions.class), + metrics); + RemoteInputChannel remote = mock(RemoteInputChannel.class); + inputGate.setInputChannel(new IntermediateResultPartitionID(), remote); + + final int buffersPerChannel = 2; + NetworkBufferPool network = mock(NetworkBufferPool.class); + inputGate.assignExclusiveSegments(network, buffersPerChannel); + + verify(network, times(1)).requestMemorySegments(buffersPerChannel); + verify(remote, times(1)).assignExclusiveSegments(anyList()); + + final UnknownInputChannel unknown = new UnknownInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new LocalConnectionManager(), + 0, + 0, + metrics); + inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); + + // Update to a remote channel and verify that requesting buffers is triggered + inputGate.updateInputChannel(new InputChannelDeploymentDescriptor( + unknown.partitionId, + ResultPartitionLocation.createRemote(mock(ConnectionID.class; + + verify(network, times(2)).requestMemorySegments(buffersPerChannel); --- End diff -- In order to verify `assignExclusiveSegments` for `UnknownInputChannel#toRemoteInputChannel`, I modified the `current.getClass() == UnknowInputChannel.class` to `current instanceof UnknownInputChannel` in `SingleInputGate#updateInputChannel`. To do so, I think it is friendly and easy for tests to mock `UnknownInputChannel`. Do you have other concerns of this modification? ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136020966 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -168,4 +171,45 @@ public void testDestroyAll() { fail(e.getMessage()); } } + + @Test + public void testRequestAndRecycleMemorySegments() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List segments = null; + // request buffers from global pool with illegal argument + try { + segments = globalPool.requestMemorySegments(0); --- End diff -- can you create a separate test method for this (invalid) use case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136022209 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java --- @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception { } } + /** +* Tests that input gate requests and assigns network buffers for remote input channel, and triggers +* this process after unknown input channel updates to remote input channel. +*/ + @Test + public void testRequestBuffersForInputChannel() throws Exception { + final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup(); + final SingleInputGate inputGate = new SingleInputGate( + "t1", + new JobID(), + new IntermediateDataSetID(), + ResultPartitionType.PIPELINED_CREDIT_BASED, + 0, + 1, + mock(TaskActions.class), + metrics); + RemoteInputChannel remote = mock(RemoteInputChannel.class); + inputGate.setInputChannel(new IntermediateResultPartitionID(), remote); + + final int buffersPerChannel = 2; + NetworkBufferPool network = mock(NetworkBufferPool.class); + inputGate.assignExclusiveSegments(network, buffersPerChannel); + + verify(network, times(1)).requestMemorySegments(buffersPerChannel); + verify(remote, times(1)).assignExclusiveSegments(anyList()); --- End diff -- `verify(remote, times(1)).assignExclusiveSegments(anyListOf(MemorySegment.class));` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136021108 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -168,4 +171,45 @@ public void testDestroyAll() { fail(e.getMessage()); } } + + @Test + public void testRequestAndRecycleMemorySegments() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List segments = null; + // request buffers from global pool with illegal argument + try { + segments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertNull(segments); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + + // common case to request buffers less than the total capacity of global pool + final int numRequiredBuffers = 8; + segments = globalPool.requestMemorySegments(numRequiredBuffers); + + assertNotNull(segments); + assertEquals(segments.size(), numRequiredBuffers); + + // recycle all the requested buffers to global pool + globalPool.recycleMemorySegments(segments); + + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + + // uncommon case to request buffers exceeding the total capacity of global pool + try { + segments = null; + segments = globalPool.requestMemorySegments(11); + fail("Should throw an IOException"); + } catch (IOException e) { + assertNull(segments); --- End diff -- `assertNull(segments);` is not needed (it will always be true) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136018729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List requestMemorySegments(int numRequiredBuffers) throws IOException { + checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); + + synchronized (factoryLock) { + if (isDestroyed) { + throw new IllegalStateException("Network buffer pool has already been destroyed."); + } + + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + + "required %d, but only %d available. The total number of network " + + "buffers is currently set to %d of %d bytes each. You can increase this " + + "number by setting the configuration keys '%s', '%s', and '%s'.", + numRequiredBuffers, + totalNumberOfMemorySegments - numTotalRequiredBuffers, + totalNumberOfMemorySegments, + memorySegmentSize, + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key())); + } + + this.numTotalRequiredBuffers += numRequiredBuffers; + + final List segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { + segments.add(availableMemorySegments.poll()); + } + + try { + redistributeBuffers(); --- End diff -- There are still some corner cases not handled properly by this implementation: consider the following unit test (please also add it to `NetworkBufferPoolTest` or `BufferPoolFactoryTest`): ``` /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} * currently not containing the number of required free segments (currently occupied by a buffer * pool). */ @Test public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { final int numBuffers = 10; NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); final List buffers = new ArrayList<>(numBuffers); List memorySegments = Collections.emptyList(); Thread bufferRecycler = null; BufferPool lbp1 = null; try { lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers); // take all buffers (more than the minimum required) for (int i = 0; i < numBuffers; ++i) { Buffer buffer = lbp1.requestBuffer(); buffers.add(buffer); assertNotNull(buffer); } // if requestMemorySegments() blocks, this will make sure that enough buffers are freed // eventually for it to continue bufferRecycler = new Thread(() -> { try { Thread.sleep(100); } catch (InterruptedException ignored) { } for (Buffer buffer : buffers) { buffer.recycle(); } }); bufferRecycler.start(); // take more buffers than are freely available at the moment via requestMemorySegments() memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2); assertThat(memorySegments, not(hasItem(nullValue(; }
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136023768 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java --- @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception { } } + /** +* Tests that input gate requests and assigns network buffers for remote input channel, and triggers +* this process after unknown input channel updates to remote input channel. +*/ + @Test + public void testRequestBuffersForInputChannel() throws Exception { + final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup(); + final SingleInputGate inputGate = new SingleInputGate( + "t1", + new JobID(), + new IntermediateDataSetID(), + ResultPartitionType.PIPELINED_CREDIT_BASED, + 0, + 1, + mock(TaskActions.class), + metrics); + RemoteInputChannel remote = mock(RemoteInputChannel.class); + inputGate.setInputChannel(new IntermediateResultPartitionID(), remote); + + final int buffersPerChannel = 2; + NetworkBufferPool network = mock(NetworkBufferPool.class); + inputGate.assignExclusiveSegments(network, buffersPerChannel); + + verify(network, times(1)).requestMemorySegments(buffersPerChannel); + verify(remote, times(1)).assignExclusiveSegments(anyList()); + + final UnknownInputChannel unknown = new UnknownInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new LocalConnectionManager(), + 0, + 0, + metrics); + inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); + + // Update to a remote channel and verify that requesting buffers is triggered + inputGate.updateInputChannel(new InputChannelDeploymentDescriptor( + unknown.partitionId, + ResultPartitionLocation.createRemote(mock(ConnectionID.class; + + verify(network, times(2)).requestMemorySegments(buffersPerChannel); --- End diff -- It would be nice if we could also verify that `assignExclusiveSegments()` is called here. For this, you'd have to return a spy in `UnknownInputChannel#toRemoteInputChannel`, I guess... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136020758 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -168,4 +171,45 @@ public void testDestroyAll() { fail(e.getMessage()); } } + + @Test + public void testRequestAndRecycleMemorySegments() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List segments = null; + // request buffers from global pool with illegal argument + try { + segments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertNull(segments); --- End diff -- `assertNull(segments);` is not needed (it will always be true) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136021056 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -168,4 +171,45 @@ public void testDestroyAll() { fail(e.getMessage()); } } + + @Test + public void testRequestAndRecycleMemorySegments() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List segments = null; + // request buffers from global pool with illegal argument + try { + segments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertNull(segments); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + + // common case to request buffers less than the total capacity of global pool + final int numRequiredBuffers = 8; + segments = globalPool.requestMemorySegments(numRequiredBuffers); + + assertNotNull(segments); + assertEquals(segments.size(), numRequiredBuffers); + + // recycle all the requested buffers to global pool + globalPool.recycleMemorySegments(segments); + + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + + // uncommon case to request buffers exceeding the total capacity of global pool + try { + segments = null; + segments = globalPool.requestMemorySegments(11); --- End diff -- same here - can you create a separate test method for this (invalid) use case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136019424 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java --- @@ -144,7 +150,7 @@ private static ResultPartition createResultPartition( * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)} */ private static SingleInputGate createSingleInputGateMock( - final ResultPartitionType partitionType, final int channels) { + final ResultPartitionType partitionType, final int channels) throws IOException { --- End diff -- remove - this exception is not thrown by the added code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135481583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() { public void setBufferPool(BufferPool bufferPool) { // Sanity checks - checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), + if (!getConsumedPartitionType().isCreditBased()) { + checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), "Bug in input gate setup logic: buffer pool has not enough guaranteed buffers " + - "for this input gate. Input gates require at least as many buffers as " + + "for this input gate. Input gates require at least as many buffers as " + "there are input channels."); + } checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool has" + - "already been set for this input gate."); + "already been set for this input gate."); this.bufferPool = checkNotNull(bufferPool); } + /** +* Assign the exclusive buffers to all remote input channels directly for credit-based mode. +* +* @param networkBufferPool The global pool to request and recycle exclusive buffers +* @param networkBuffersPerChannel The number of exclusive buffers for each channel +*/ + public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException { + this.networkBufferPool = checkNotNull(networkBufferPool); --- End diff -- please guard against using this method multiple times (like in `setBufferPool`) as a sanity check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135480975 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List requestMemorySegments(int numRequiredBuffers) throws IOException { + synchronized (factoryLock) { --- End diff -- should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---