[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4485


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140271306
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws 
IOException {
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
 
-   String msg = "Wrong number of available segments after create 
buffer pool and request segments.";
+   String msg = "Wrong number of available segments after creating 
buffer pool and requesting segments.";
--- End diff --

still "buffer pool**s**"


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140190096
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -264,11 +271,13 @@ public void 
testRequestMemorySegmentsWithBuffersTaken() throws IOException, Inte
assertNotNull(buffer);
}
 
-   // if requestMemorySegments() blocks, this will make 
sure that enough buffers are freed
-   // eventually for it to continue
+   // requestMemorySegments() below will and wait for 
buffers
--- End diff --

typo from my code example: should be "below will wait for buffers"


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140184098
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -150,7 +150,7 @@ public void testUniformDistributionBounded3() throws 
IOException {
assertEquals(1, third.getNumBuffers());
 
// similar to #verifyAllBuffersReturned()
-   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   String msg = "Wrong number of available segments after create 
buffer pools.";
--- End diff --

"after creating buffer pools"


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140184571
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws 
IOException {
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
 
-   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   String msg = "Wrong number of available segments after create 
buffer pool and request segments.";
--- End diff --

"creating buffer pools and requesting segments"


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140189504
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -188,14 +195,14 @@ public void 
testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List memorySegments = Collections.emptyList();
try {
-   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
-
+   List memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
assertEquals(memorySegments.size(), numBuffers / 2);
-   } finally {
+
globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   } finally {
--- End diff --

Actually, in case one of the assertions fails, we also need to recycle the 
requested memory segments since they are not in the pool anymore. How about 
this?

```
List memorySegments = Collections.emptyList();
try {
memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
assertEquals(memorySegments.size(), numBuffers / 2);

globalPool.recycleMemorySegments(memorySegments);
memorySegments.clear();

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
} finally {
globalPool.recycleMemorySegments(memorySegments); // 
just in case
globalPool.destroy();
}
```


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140187056
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

Oh yes, you're right.
I guess we were lucky after all since (after quickly scanning over the 
class) it does not seem that any buffers are requested from the created pools. 
The `verifyAllBuffersReturned()` method verifies that by checking 
`networkBufferPool.getNumberOfAvailableMemorySegments()` (as did you) but in 
case of failures, the buffer pools are not cleaned up either. Maybe we should 
put the cleanup into the `finally` block of a surrounding try-catch as the 
following (and similar in your new test)?

```
@After
public void verifyAllBuffersReturned() {
String msg = "Did not return all buffers to network buffer pool 
after test.";
try {
assertEquals(msg, numBuffers, 
networkBufferPool.getNumberOfAvailableMemorySegments());
} finally {
// in case buffers have actually been requested, we 
must release them again
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
}
```


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139083671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -333,7 +333,7 @@ public void 
updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
 
InputChannel current = inputChannels.get(partitionId);
 
-   if (current.getClass() == UnknownInputChannel.class) {
+   if (current instanceof UnknownInputChannel) {
--- End diff --

agree


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139083473
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+   }
+
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently not containing the number of required free segments 
(currently occupied by a buffer pool).
+*/
+   @Test
+   public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
+   final int numBuffers = 10;
+

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139080305
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);
--- End diff --

yes


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139080289
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
--- End diff --

yes


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139078792
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   final List segments = new 
ArrayList<>(numRequiredBuffers);
-   for (int i = 0 ; i < numRequiredBuffers ; i++) {
-   segments.add(availableMemorySegments.poll());
-   }
+   redistributeBuffers();
+   }
 
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
-   redistributeBuffers();
-   } catch (IOException e) {
-   if (segments.size() > 0) {
-   recycleMemorySegments(segments);
-   }
-
+   segments.add(availableMemorySegments.take());
--- End diff --

Yes, I think this way seems better than always blocking to wait.  It is 
very nice of you to give the specific test. 👍 


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139076168
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

The other tests in `BufferPoolFactoryTest` also has this issue. I will add 
`destroyAllBufferPools()` in this new test.


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139075946
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
--- End diff --

I referred to the previous existing test `testUniformDistributionBounded3` 
and I think it has the same issue. I will modify both of them.


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138891582
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+   }
+
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently not containing the number of required free segments 
(currently occupied by a buffer pool).
+*/
+   @Test
+   public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
+   final int numBuffers = 10;
+
+

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889161
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
--- End diff --

add `finally`with `globalPool.destroy()`


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13882
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
--- End diff --

here, you should also destroy the `globalPool`, i.e. call 
`globalPool.destroy()`


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138879343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   final List segments = new 
ArrayList<>(numRequiredBuffers);
-   for (int i = 0 ; i < numRequiredBuffers ; i++) {
-   segments.add(availableMemorySegments.poll());
-   }
+   redistributeBuffers();
+   }
 
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
-   redistributeBuffers();
-   } catch (IOException e) {
-   if (segments.size() > 0) {
-   recycleMemorySegments(segments);
-   }
-
+   segments.add(availableMemorySegments.take());
--- End diff --

I know, I was the one who suggested it, but thinking about the blocking 
`take()` a bit more and with some more background I acquired over the last 
weeks, I'm getting the feeling, we should do the request similar to 
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting 
forever, we may at least be stopped by the `destroy()` function being called. 
Or what do you think? I'm thinking about something like this:
```
final ArrayList segments = new 
ArrayList<>(numRequiredBuffers);
try {
while (segments.size() < numRequiredBuffers) {
if (isDestroyed) {
throw new IllegalStateException("Buffer 
pool is destroyed.");
}

final MemorySegment segment = 
availableMemorySegments.poll(2, TimeUnit.SECONDS);
if (segment != null) {
segments.add(segment);
}
}
} catch (Throwable e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
```
(using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)

The following test (for `NetworkBufferPoolTest`) could verify this 
behaviour:
```
@Rule
public ExpectedException expectedException = ExpectedException.none();

/**
 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, 
verifying it may be aborted in
 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 */
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;

NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);

final OneShotLatch isRunning = new OneShotLatch();
CheckedThread asyncRequest = new CheckedThread() {
@Override
public void go() throws Exception {
isRunning.trigger();
globalPool.requestMemorySegments(10);
}
};
asyncRequest.start();

// We want the destroy call inside the blocking part of the 
globalPool.requestMemorySegments()
// call above. We cannot guarantee this though but make it 
highly probable:
isRunning.await();
Thread.sleep(10);
globalPool.destroy();

segment.free();

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("destroyed");
asyncRequest.sync();
}
```


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13514
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
--- End diff --

unnecessary check - see above


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138897087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -333,7 +333,7 @@ public void 
updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
 
InputChannel current = inputChannels.get(partitionId);
 
-   if (current.getClass() == UnknownInputChannel.class) {
+   if (current instanceof UnknownInputChannel) {
--- End diff --

Just to be on the safe side, you should also change this check in 
`#setInputChannel()` above. This way, we handle all sub-classes of 
`UnknownInputChannel` the same way as `UnknownInputChannel` itself


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
--- End diff --

add `finally`with `globalPool.destroy()`


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138852758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
--- End diff --

for this test, `msg` is wrong as nothing has been recycled here (yet)


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13440
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);
--- End diff --

If there was an exception, `memorySegments` will _always_ be the 
`Collections.emptyList()` you set before, so there's no need to check for its 
size.


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138853431
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

you also need to call `NetworkBufferPool#destroyAllBufferPools()` or 
`LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release 
their buffers


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136904188
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ---
@@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws 
Exception {
}
}
 
+   /**
+* Tests that input gate requests and assigns network buffers for 
remote input channel, and triggers
+* this process after unknown input channel updates to remote input 
channel.
+*/
+   @Test
+   public void testRequestBuffersForInputChannel() throws Exception {
+   final TaskIOMetricGroup metrics = new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
+   final SingleInputGate inputGate = new SingleInputGate(
+   "t1",
+   new JobID(),
+   new IntermediateDataSetID(),
+   ResultPartitionType.PIPELINED_CREDIT_BASED,
+   0,
+   1,
+   mock(TaskActions.class),
+   metrics);
+   RemoteInputChannel remote = mock(RemoteInputChannel.class);
+   inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
+
+   final int buffersPerChannel = 2;
+   NetworkBufferPool network = mock(NetworkBufferPool.class);
+   inputGate.assignExclusiveSegments(network, buffersPerChannel);
+
+   verify(network, 
times(1)).requestMemorySegments(buffersPerChannel);
+   verify(remote, times(1)).assignExclusiveSegments(anyList());
+
+   final UnknownInputChannel unknown = new UnknownInputChannel(
+   inputGate,
+   0,
+   new ResultPartitionID(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new LocalConnectionManager(),
+   0,
+   0,
+   metrics);
+   inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
+
+   // Update to a remote channel and verify that requesting 
buffers is triggered
+   inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(
+   unknown.partitionId,
+   
ResultPartitionLocation.createRemote(mock(ConnectionID.class;
+
+   verify(network, 
times(2)).requestMemorySegments(buffersPerChannel);
--- End diff --

In order to verify `assignExclusiveSegments` for 
`UnknownInputChannel#toRemoteInputChannel`, I modified the `current.getClass() 
== UnknowInputChannel.class` to `current instanceof UnknownInputChannel` in 
`SingleInputGate#updateInputChannel`. To do so, I think it is friendly and easy 
for tests to mock `UnknownInputChannel`. Do you have other concerns of this 
modification?


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136020966
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
--- End diff --

can you create a separate test method for this (invalid) use case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136022209
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ---
@@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws 
Exception {
}
}
 
+   /**
+* Tests that input gate requests and assigns network buffers for 
remote input channel, and triggers
+* this process after unknown input channel updates to remote input 
channel.
+*/
+   @Test
+   public void testRequestBuffersForInputChannel() throws Exception {
+   final TaskIOMetricGroup metrics = new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
+   final SingleInputGate inputGate = new SingleInputGate(
+   "t1",
+   new JobID(),
+   new IntermediateDataSetID(),
+   ResultPartitionType.PIPELINED_CREDIT_BASED,
+   0,
+   1,
+   mock(TaskActions.class),
+   metrics);
+   RemoteInputChannel remote = mock(RemoteInputChannel.class);
+   inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
+
+   final int buffersPerChannel = 2;
+   NetworkBufferPool network = mock(NetworkBufferPool.class);
+   inputGate.assignExclusiveSegments(network, buffersPerChannel);
+
+   verify(network, 
times(1)).requestMemorySegments(buffersPerChannel);
+   verify(remote, times(1)).assignExclusiveSegments(anyList());
--- End diff --

`verify(remote, 
times(1)).assignExclusiveSegments(anyListOf(MemorySegment.class));`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136021108
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertNull(segments);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+
+   // common case to request buffers less than the total capacity 
of global pool
+   final int numRequiredBuffers = 8;
+   segments = globalPool.requestMemorySegments(numRequiredBuffers);
+
+   assertNotNull(segments);
+   assertEquals(segments.size(), numRequiredBuffers);
+
+   // recycle all the requested buffers to global pool
+   globalPool.recycleMemorySegments(segments);
+
+   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+
+   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   try {
+   segments = null;
+   segments = globalPool.requestMemorySegments(11);
+   fail("Should throw an IOException");
+   } catch (IOException e) {
+   assertNull(segments);
--- End diff --

`assertNull(segments);` is not needed (it will always be true)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136018729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
+
+   synchronized (factoryLock) {
+   if (isDestroyed) {
+   throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
+   }
+
+   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
+   throw new 
IOException(String.format("Insufficient number of network buffers: " +
+   "required %d, 
but only %d available. The total number of network " +
+   "buffers is 
currently set to %d of %d bytes each. You can increase this " +
+   "number by 
setting the configuration keys '%s', '%s', and '%s'.",
+   numRequiredBuffers,
+   totalNumberOfMemorySegments - 
numTotalRequiredBuffers,
+   totalNumberOfMemorySegments,
+   memorySegmentSize,
+   
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+   
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+   
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+   }
+
+   this.numTotalRequiredBuffers += numRequiredBuffers;
+
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
+   segments.add(availableMemorySegments.poll());
+   }
+
+   try {
+   redistributeBuffers();
--- End diff --

There are still some corner cases not handled properly by this 
implementation: consider the following unit test (please also add it to 
`NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
```
/**
 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
 * currently not containing the number of required free segments 
(currently occupied by a buffer
 * pool).
 */
@Test
public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
final int numBuffers = 10;

NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);

final List buffers = new ArrayList<>(numBuffers);
List memorySegments = Collections.emptyList();
Thread bufferRecycler = null;
BufferPool lbp1 = null;
try {
lbp1 = networkBufferPool.createBufferPool(numBuffers / 
2, numBuffers);

// take all buffers (more than the minimum required)
for (int i = 0; i < numBuffers; ++i) {
Buffer buffer = lbp1.requestBuffer();
buffers.add(buffer);
assertNotNull(buffer);
}

// if requestMemorySegments() blocks, this will make 
sure that enough buffers are freed
// eventually for it to continue
bufferRecycler = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}

for (Buffer buffer : buffers) {
buffer.recycle();
}
});
bufferRecycler.start();

// take more buffers than are freely available at the 
moment via requestMemorySegments()
memorySegments = 
networkBufferPool.requestMemorySegments(numBuffers / 2);
assertThat(memorySegments, not(hasItem(nullValue(;
} 

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136023768
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ---
@@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws 
Exception {
}
}
 
+   /**
+* Tests that input gate requests and assigns network buffers for 
remote input channel, and triggers
+* this process after unknown input channel updates to remote input 
channel.
+*/
+   @Test
+   public void testRequestBuffersForInputChannel() throws Exception {
+   final TaskIOMetricGroup metrics = new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
+   final SingleInputGate inputGate = new SingleInputGate(
+   "t1",
+   new JobID(),
+   new IntermediateDataSetID(),
+   ResultPartitionType.PIPELINED_CREDIT_BASED,
+   0,
+   1,
+   mock(TaskActions.class),
+   metrics);
+   RemoteInputChannel remote = mock(RemoteInputChannel.class);
+   inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
+
+   final int buffersPerChannel = 2;
+   NetworkBufferPool network = mock(NetworkBufferPool.class);
+   inputGate.assignExclusiveSegments(network, buffersPerChannel);
+
+   verify(network, 
times(1)).requestMemorySegments(buffersPerChannel);
+   verify(remote, times(1)).assignExclusiveSegments(anyList());
+
+   final UnknownInputChannel unknown = new UnknownInputChannel(
+   inputGate,
+   0,
+   new ResultPartitionID(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new LocalConnectionManager(),
+   0,
+   0,
+   metrics);
+   inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
+
+   // Update to a remote channel and verify that requesting 
buffers is triggered
+   inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(
+   unknown.partitionId,
+   
ResultPartitionLocation.createRemote(mock(ConnectionID.class;
+
+   verify(network, 
times(2)).requestMemorySegments(buffersPerChannel);
--- End diff --

It would be nice if we could also verify that `assignExclusiveSegments()` 
is called here. For this, you'd have to return a spy in 
`UnknownInputChannel#toRemoteInputChannel`, I guess...




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136020758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertNull(segments);
--- End diff --

`assertNull(segments);` is not needed (it will always be true)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136021056
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertNull(segments);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+
+   // common case to request buffers less than the total capacity 
of global pool
+   final int numRequiredBuffers = 8;
+   segments = globalPool.requestMemorySegments(numRequiredBuffers);
+
+   assertNotNull(segments);
+   assertEquals(segments.size(), numRequiredBuffers);
+
+   // recycle all the requested buffers to global pool
+   globalPool.recycleMemorySegments(segments);
+
+   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+
+   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   try {
+   segments = null;
+   segments = globalPool.requestMemorySegments(11);
--- End diff --

same here - can you create a separate test method for this (invalid) use 
case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136019424
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 ---
@@ -144,7 +150,7 @@ private static ResultPartition createResultPartition(
 * @return mock with minimal functionality necessary by {@link 
NetworkEnvironment#registerTask(Task)}
 */
private static SingleInputGate createSingleInputGateMock(
-   final ResultPartitionType partitionType, final int 
channels) {
+   final ResultPartitionType partitionType, final int 
channels) throws IOException {
--- End diff --

remove - this exception is not thrown by the added code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135481583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
 
public void setBufferPool(BufferPool bufferPool) {
// Sanity checks
-   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
+   if (!getConsumedPartitionType().isCreditBased()) {
+   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
"Bug in input gate setup logic: buffer pool has 
not enough guaranteed buffers " +
-   "for this input gate. Input 
gates require at least as many buffers as " +
+   "for this input gate. Input gates 
require at least as many buffers as " +
"there are input channels.");
+   }
 
checkState(this.bufferPool == null, "Bug in input gate setup 
logic: buffer pool has" +
-   "already been set for this input gate.");
+   "already been set for this input gate.");
 
this.bufferPool = checkNotNull(bufferPool);
}
 
+   /**
+* Assign the exclusive buffers to all remote input channels directly 
for credit-based mode.
+*
+* @param networkBufferPool The global pool to request and recycle 
exclusive buffers
+* @param networkBuffersPerChannel The number of exclusive buffers for 
each channel
+*/
+   public void assignExclusiveSegments(NetworkBufferPool 
networkBufferPool, int networkBuffersPerChannel) throws IOException {
+   this.networkBufferPool = checkNotNull(networkBufferPool);
--- End diff --

please guard against using this method multiple times (like in 
`setBufferPool`) as a sanity check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135480975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   synchronized (factoryLock) {
--- End diff --

should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---