[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198784#comment-16198784
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4485


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-10-10 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198780#comment-16198780
 ] 

Chesnay Schepler commented on FLINK-7378:
-

1.4: 450d9df9e96718575ab2979f256f99be4d699636

> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198576#comment-16198576
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4485
  
merging.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16175953#comment-16175953
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I missed that message of `verifyAllBuffersReturned()` issue before.
I have submitted the modifications of it. :)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174926#comment-16174926
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140271306
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws 
IOException {
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
 
-   String msg = "Wrong number of available segments after create 
buffer pool and request segments.";
+   String msg = "Wrong number of available segments after creating 
buffer pool and requesting segments.";
--- End diff --

still "buffer pool**s**"


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174757#comment-16174757
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , sorry for the typos. I have submitted the updates.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174493#comment-16174493
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140187056
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

Oh yes, you're right.
I guess we were lucky after all since (after quickly scanning over the 
class) it does not seem that any buffers are requested from the created pools. 
The `verifyAllBuffersReturned()` method verifies that by checking 
`networkBufferPool.getNumberOfAvailableMemorySegments()` (as did you) but in 
case of failures, the buffer pools are not cleaned up either. Maybe we should 
put the cleanup into the `finally` block of a surrounding try-catch as the 
following (and similar in your new test)?

```
@After
public void verifyAllBuffersReturned() {
String msg = "Did not return all buffers to network buffer pool 
after test.";
try {
assertEquals(msg, numBuffers, 
networkBufferPool.getNumberOfAvailableMemorySegments());
} finally {
// in case buffers have actually been requested, we 
must release them again
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
}
```


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174492#comment-16174492
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140184571
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws 
IOException {
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
 
-   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   String msg = "Wrong number of available segments after create 
buffer pool and request segments.";
--- End diff --

"creating buffer pools and requesting segments"


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174495#comment-16174495
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140184098
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -150,7 +150,7 @@ public void testUniformDistributionBounded3() throws 
IOException {
assertEquals(1, third.getNumBuffers());
 
// similar to #verifyAllBuffersReturned()
-   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   String msg = "Wrong number of available segments after create 
buffer pools.";
--- End diff --

"after creating buffer pools"


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174494#comment-16174494
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140190096
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -264,11 +271,13 @@ public void 
testRequestMemorySegmentsWithBuffersTaken() throws IOException, Inte
assertNotNull(buffer);
}
 
-   // if requestMemorySegments() blocks, this will make 
sure that enough buffers are freed
-   // eventually for it to continue
+   // requestMemorySegments() below will and wait for 
buffers
--- End diff --

typo from my code example: should be "below will wait for buffers"


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174496#comment-16174496
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140189504
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -188,14 +195,14 @@ public void 
testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List memorySegments = Collections.emptyList();
try {
-   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
-
+   List memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
assertEquals(memorySegments.size(), numBuffers / 2);
-   } finally {
+
globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   } finally {
--- End diff --

Actually, in case one of the assertions fails, we also need to recycle the 
requested memory segments since they are not in the pool anymore. How about 
this?

```
List memorySegments = Collections.emptyList();
try {
memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
assertEquals(memorySegments.size(), numBuffers / 2);

globalPool.recycleMemorySegments(memorySegments);
memorySegments.clear();

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
} finally {
globalPool.recycleMemorySegments(memorySegments); // 
just in case
globalPool.destroy();
}
```


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167791#comment-16167791
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167482#comment-16167482
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , thank you for always helpful reviews!

I am very busy improving the runtime for our singles day these days. I will 
submit the updates for this PR later today. And I also plan to update the next 
PR which is based on this PR during weekends.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167475#comment-16167475
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139083671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -333,7 +333,7 @@ public void 
updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
 
InputChannel current = inputChannels.get(partitionId);
 
-   if (current.getClass() == UnknownInputChannel.class) {
+   if (current instanceof UnknownInputChannel) {
--- End diff --

agree


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167474#comment-16167474
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139083473
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+   }
+
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently not containing the number of required 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167459#comment-16167459
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139080305
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);
--- End diff --

yes


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167458#comment-16167458
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139080289
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
--- End diff --

yes


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167447#comment-16167447
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139078792
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   final List segments = new 
ArrayList<>(numRequiredBuffers);
-   for (int i = 0 ; i < numRequiredBuffers ; i++) {
-   segments.add(availableMemorySegments.poll());
-   }
+   redistributeBuffers();
+   }
 
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
-   redistributeBuffers();
-   } catch (IOException e) {
-   if (segments.size() > 0) {
-   recycleMemorySegments(segments);
-   }
-
+   segments.add(availableMemorySegments.take());
--- End diff --

Yes, I think this way seems better than always blocking to wait.  It is 
very nice of you to give the specific test.  


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167430#comment-16167430
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139076168
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

The other tests in `BufferPoolFactoryTest` also has this issue. I will add 
`destroyAllBufferPools()` in this new test.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167428#comment-16167428
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r139075946
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
--- End diff --

I referred to the previous existing test `testUniformDistributionBounded3` 
and I think it has the same issue. I will modify both of them.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166323#comment-16166323
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138879343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   final List segments = new 
ArrayList<>(numRequiredBuffers);
-   for (int i = 0 ; i < numRequiredBuffers ; i++) {
-   segments.add(availableMemorySegments.poll());
-   }
+   redistributeBuffers();
+   }
 
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
-   redistributeBuffers();
-   } catch (IOException e) {
-   if (segments.size() > 0) {
-   recycleMemorySegments(segments);
-   }
-
+   segments.add(availableMemorySegments.take());
--- End diff --

I know, I was the one who suggested it, but thinking about the blocking 
`take()` a bit more and with some more background I acquired over the last 
weeks, I'm getting the feeling, we should do the request similar to 
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting 
forever, we may at least be stopped by the `destroy()` function being called. 
Or what do you think? I'm thinking about something like this:
```
final ArrayList segments = new 
ArrayList<>(numRequiredBuffers);
try {
while (segments.size() < numRequiredBuffers) {
if (isDestroyed) {
throw new IllegalStateException("Buffer 
pool is destroyed.");
}

final MemorySegment segment = 
availableMemorySegments.poll(2, TimeUnit.SECONDS);
if (segment != null) {
segments.add(segment);
}
}
} catch (Throwable e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
```
(using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)

The following test (for `NetworkBufferPoolTest`) could verify this 
behaviour:
```
@Rule
public ExpectedException expectedException = ExpectedException.none();

/**
 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, 
verifying it may be aborted in
 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 */
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;

NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);

final OneShotLatch isRunning = new OneShotLatch();
CheckedThread asyncRequest = new CheckedThread() {
@Override
public void go() throws Exception {
isRunning.trigger();
globalPool.requestMemorySegments(10);
}
};
asyncRequest.start();

// We want the destroy call inside the blocking part of the 
globalPool.requestMemorySegments()
// call above. We cannot guarantee this though but make it 
highly probable:
isRunning.await();
Thread.sleep(10);
globalPool.destroy();

segment.free();

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("destroyed");
asyncRequest.sync();
}
```


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
> 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166322#comment-16166322
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138852758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
--- End diff --

for this test, `msg` is wrong as nothing has been recycled here (yet)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166324#comment-16166324
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138891582
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+   }
+
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently not containing the number of required free 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166327#comment-16166327
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889161
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
--- End diff --

add `finally`with `globalPool.destroy()`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166329#comment-16166329
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138853431
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

you also need to call `NetworkBufferPool#destroyAllBufferPools()` or 
`LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release 
their buffers


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166328#comment-16166328
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13514
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
--- End diff --

unnecessary check - see above


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166325#comment-16166325
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13882
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
--- End diff --

here, you should also destroy the `globalPool`, i.e. call 
`globalPool.destroy()`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166331#comment-16166331
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138897087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -333,7 +333,7 @@ public void 
updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
 
InputChannel current = inputChannels.get(partitionId);
 
-   if (current.getClass() == UnknownInputChannel.class) {
+   if (current instanceof UnknownInputChannel) {
--- End diff --

Just to be on the safe side, you should also change this check in 
`#setInputChannel()` above. This way, we handle all sub-classes of 
`UnknownInputChannel` the same way as `UnknownInputChannel` itself


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166330#comment-16166330
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
--- End diff --

add `finally`with `globalPool.destroy()`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166326#comment-16166326
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13440
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);
--- End diff --

If there was an exception, `memorySegments` will _always_ be the 
`Collections.emptyList()` you set before, so there's no need to check for its 
size.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153156#comment-16153156
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136904188
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ---
@@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws 
Exception {
}
}
 
+   /**
+* Tests that input gate requests and assigns network buffers for 
remote input channel, and triggers
+* this process after unknown input channel updates to remote input 
channel.
+*/
+   @Test
+   public void testRequestBuffersForInputChannel() throws Exception {
+   final TaskIOMetricGroup metrics = new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
+   final SingleInputGate inputGate = new SingleInputGate(
+   "t1",
+   new JobID(),
+   new IntermediateDataSetID(),
+   ResultPartitionType.PIPELINED_CREDIT_BASED,
+   0,
+   1,
+   mock(TaskActions.class),
+   metrics);
+   RemoteInputChannel remote = mock(RemoteInputChannel.class);
+   inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
+
+   final int buffersPerChannel = 2;
+   NetworkBufferPool network = mock(NetworkBufferPool.class);
+   inputGate.assignExclusiveSegments(network, buffersPerChannel);
+
+   verify(network, 
times(1)).requestMemorySegments(buffersPerChannel);
+   verify(remote, times(1)).assignExclusiveSegments(anyList());
+
+   final UnknownInputChannel unknown = new UnknownInputChannel(
+   inputGate,
+   0,
+   new ResultPartitionID(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new LocalConnectionManager(),
+   0,
+   0,
+   metrics);
+   inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
+
+   // Update to a remote channel and verify that requesting 
buffers is triggered
+   inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(
+   unknown.partitionId,
+   
ResultPartitionLocation.createRemote(mock(ConnectionID.class;
+
+   verify(network, 
times(2)).requestMemorySegments(buffersPerChannel);
--- End diff --

In order to verify `assignExclusiveSegments` for 
`UnknownInputChannel#toRemoteInputChannel`, I modified the `current.getClass() 
== UnknowInputChannel.class` to `current instanceof UnknownInputChannel` in 
`SingleInputGate#updateInputChannel`. To do so, I think it is friendly and easy 
for tests to mock `UnknownInputChannel`. Do you have other concerns of this 
modification?


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153153#comment-16153153
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates based on last comments.

Your concerns of interaction between requesting memory segments and 
creating buffer pool is really necessary. I also noticed that after reviewed 
the process.  



> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147026#comment-16147026
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , thank you for so detail and helpful comments!

I will be on team outing tomorrow and come back on Sunday. I would consider 
you concerns carefully and may submit the updates next week.



> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147003#comment-16147003
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136022209
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ---
@@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws 
Exception {
}
}
 
+   /**
+* Tests that input gate requests and assigns network buffers for 
remote input channel, and triggers
+* this process after unknown input channel updates to remote input 
channel.
+*/
+   @Test
+   public void testRequestBuffersForInputChannel() throws Exception {
+   final TaskIOMetricGroup metrics = new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
+   final SingleInputGate inputGate = new SingleInputGate(
+   "t1",
+   new JobID(),
+   new IntermediateDataSetID(),
+   ResultPartitionType.PIPELINED_CREDIT_BASED,
+   0,
+   1,
+   mock(TaskActions.class),
+   metrics);
+   RemoteInputChannel remote = mock(RemoteInputChannel.class);
+   inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
+
+   final int buffersPerChannel = 2;
+   NetworkBufferPool network = mock(NetworkBufferPool.class);
+   inputGate.assignExclusiveSegments(network, buffersPerChannel);
+
+   verify(network, 
times(1)).requestMemorySegments(buffersPerChannel);
+   verify(remote, times(1)).assignExclusiveSegments(anyList());
--- End diff --

`verify(remote, 
times(1)).assignExclusiveSegments(anyListOf(MemorySegment.class));`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147001#comment-16147001
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136023768
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ---
@@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws 
Exception {
}
}
 
+   /**
+* Tests that input gate requests and assigns network buffers for 
remote input channel, and triggers
+* this process after unknown input channel updates to remote input 
channel.
+*/
+   @Test
+   public void testRequestBuffersForInputChannel() throws Exception {
+   final TaskIOMetricGroup metrics = new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
+   final SingleInputGate inputGate = new SingleInputGate(
+   "t1",
+   new JobID(),
+   new IntermediateDataSetID(),
+   ResultPartitionType.PIPELINED_CREDIT_BASED,
+   0,
+   1,
+   mock(TaskActions.class),
+   metrics);
+   RemoteInputChannel remote = mock(RemoteInputChannel.class);
+   inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
+
+   final int buffersPerChannel = 2;
+   NetworkBufferPool network = mock(NetworkBufferPool.class);
+   inputGate.assignExclusiveSegments(network, buffersPerChannel);
+
+   verify(network, 
times(1)).requestMemorySegments(buffersPerChannel);
+   verify(remote, times(1)).assignExclusiveSegments(anyList());
+
+   final UnknownInputChannel unknown = new UnknownInputChannel(
+   inputGate,
+   0,
+   new ResultPartitionID(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new LocalConnectionManager(),
+   0,
+   0,
+   metrics);
+   inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
+
+   // Update to a remote channel and verify that requesting 
buffers is triggered
+   inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(
+   unknown.partitionId,
+   
ResultPartitionLocation.createRemote(mock(ConnectionID.class;
+
+   verify(network, 
times(2)).requestMemorySegments(buffersPerChannel);
--- End diff --

It would be nice if we could also verify that `assignExclusiveSegments()` 
is called here. For this, you'd have to return a spy in 
`UnknownInputChannel#toRemoteInputChannel`, I guess...




> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146998#comment-16146998
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136021056
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertNull(segments);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+
+   // common case to request buffers less than the total capacity 
of global pool
+   final int numRequiredBuffers = 8;
+   segments = globalPool.requestMemorySegments(numRequiredBuffers);
+
+   assertNotNull(segments);
+   assertEquals(segments.size(), numRequiredBuffers);
+
+   // recycle all the requested buffers to global pool
+   globalPool.recycleMemorySegments(segments);
+
+   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+
+   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   try {
+   segments = null;
+   segments = globalPool.requestMemorySegments(11);
--- End diff --

same here - can you create a separate test method for this (invalid) use 
case?


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146999#comment-16146999
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136021108
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertNull(segments);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+
+   // common case to request buffers less than the total capacity 
of global pool
+   final int numRequiredBuffers = 8;
+   segments = globalPool.requestMemorySegments(numRequiredBuffers);
+
+   assertNotNull(segments);
+   assertEquals(segments.size(), numRequiredBuffers);
+
+   // recycle all the requested buffers to global pool
+   globalPool.recycleMemorySegments(segments);
+
+   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+
+   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   try {
+   segments = null;
+   segments = globalPool.requestMemorySegments(11);
+   fail("Should throw an IOException");
+   } catch (IOException e) {
+   assertNull(segments);
--- End diff --

`assertNull(segments);` is not needed (it will always be true)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147004#comment-16147004
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136019424
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 ---
@@ -144,7 +150,7 @@ private static ResultPartition createResultPartition(
 * @return mock with minimal functionality necessary by {@link 
NetworkEnvironment#registerTask(Task)}
 */
private static SingleInputGate createSingleInputGateMock(
-   final ResultPartitionType partitionType, final int 
channels) {
+   final ResultPartitionType partitionType, final int 
channels) throws IOException {
--- End diff --

remove - this exception is not thrown by the added code


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147005#comment-16147005
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136020758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertNull(segments);
--- End diff --

`assertNull(segments);` is not needed (it will always be true)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147000#comment-16147000
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136018729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
+
+   synchronized (factoryLock) {
+   if (isDestroyed) {
+   throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
+   }
+
+   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
+   throw new 
IOException(String.format("Insufficient number of network buffers: " +
+   "required %d, 
but only %d available. The total number of network " +
+   "buffers is 
currently set to %d of %d bytes each. You can increase this " +
+   "number by 
setting the configuration keys '%s', '%s', and '%s'.",
+   numRequiredBuffers,
+   totalNumberOfMemorySegments - 
numTotalRequiredBuffers,
+   totalNumberOfMemorySegments,
+   memorySegmentSize,
+   
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+   
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+   
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+   }
+
+   this.numTotalRequiredBuffers += numRequiredBuffers;
+
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
+   segments.add(availableMemorySegments.poll());
+   }
+
+   try {
+   redistributeBuffers();
--- End diff --

There are still some corner cases not handled properly by this 
implementation: consider the following unit test (please also add it to 
`NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
```
/**
 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
 * currently not containing the number of required free segments 
(currently occupied by a buffer
 * pool).
 */
@Test
public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
final int numBuffers = 10;

NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);

final List buffers = new ArrayList<>(numBuffers);
List memorySegments = Collections.emptyList();
Thread bufferRecycler = null;
BufferPool lbp1 = null;
try {
lbp1 = networkBufferPool.createBufferPool(numBuffers / 
2, numBuffers);

// take all buffers (more than the minimum required)
for (int i = 0; i < numBuffers; ++i) {
Buffer buffer = lbp1.requestBuffer();
buffers.add(buffer);
assertNotNull(buffer);
}

// if requestMemorySegments() blocks, this will make 
sure that enough buffers are freed
// eventually for it to continue
bufferRecycler = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}

for (Buffer buffer : buffers) {
buffer.recycle();
}
});
bufferRecycler.start();

// take more buffers than are freely available at 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147002#comment-16147002
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r136020966
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -168,4 +171,45 @@ public void testDestroyAll() {
fail(e.getMessage());
}
}
+
+   @Test
+   public void testRequestAndRecycleMemorySegments() throws Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List segments = null;
+   // request buffers from global pool with illegal argument
+   try {
+   segments = globalPool.requestMemorySegments(0);
--- End diff --

can you create a separate test method for this (invalid) use case?


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144946#comment-16144946
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates based on the above comments. :(


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143814#comment-16143814
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
Yes, this way also has some advantages, and recycling these exclusive 
buffers would be covered in next PR with some additional tests. 

I will consider your suggestions to supplement some tests in this PR and 
submit the modifications based on all the above comments.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143560#comment-16143560
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4485
  
Hi @zhijiangW,
regarding the buffer pool implementation, I was just curious about why it 
was done that way. But it is fine to keep the logic in `RemoteInputChannel` if 
you make sure, that a recycler puts these buffers right (back) into the buffer 
queue (I guess, that's in one of the follow-up PRs). This way, we avoid an 
additional intermediate component (and the need to interact with it). To 
conclude, on a second thought, it is fine as it is.

The thing with `ResultPartitionType` is that without an (intermediate) way 
to set `isCreditBased` to `true`, we are not really able to test this code path 
on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in 
the follow-up PRs as well).

Speaking of tests...I understand that with the switch to credit-based flow 
control, some parts will be covered by existing tests, but we also change the 
behaviour at some points and the current tests are already a bit sparse. Can 
you also add tests for
- the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`),
- `NetworkBufferPool#requestMemorySegments`, 
`NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which 
currently is a bit sparse though)
- the changes in `SingleInputGate` (into `SingleInputGateTest`)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143556#comment-16143556
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135481583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
 
public void setBufferPool(BufferPool bufferPool) {
// Sanity checks
-   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
+   if (!getConsumedPartitionType().isCreditBased()) {
+   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
"Bug in input gate setup logic: buffer pool has 
not enough guaranteed buffers " +
-   "for this input gate. Input 
gates require at least as many buffers as " +
+   "for this input gate. Input gates 
require at least as many buffers as " +
"there are input channels.");
+   }
 
checkState(this.bufferPool == null, "Bug in input gate setup 
logic: buffer pool has" +
-   "already been set for this input gate.");
+   "already been set for this input gate.");
 
this.bufferPool = checkNotNull(bufferPool);
}
 
+   /**
+* Assign the exclusive buffers to all remote input channels directly 
for credit-based mode.
+*
+* @param networkBufferPool The global pool to request and recycle 
exclusive buffers
+* @param networkBuffersPerChannel The number of exclusive buffers for 
each channel
+*/
+   public void assignExclusiveSegments(NetworkBufferPool 
networkBufferPool, int networkBuffersPerChannel) throws IOException {
+   this.networkBufferPool = checkNotNull(networkBufferPool);
--- End diff --

please guard against using this method multiple times (like in 
`setBufferPool`) as a sanity check


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143553#comment-16143553
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135480975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   synchronized (factoryLock) {
--- End diff --

should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`?


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143451#comment-16143451
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
For `ResultPartitionType` comment, I expand to add the 'isCreditBased' 
field temporarily in order not to affect the current process. My initial idea 
is to remove this field after the whole feature is verified to enable. If you 
approve this as a formal way, I will add the new mode 
`PIPELINE_CREDIT_BASED(true, true, true, true)` and javadoc for it.

For `gate.setBufferPool(bufferPool)`, it is my carelessness to not add it.

For releasing segments when exception, it is actually better to process 
inside the method of `NetworkBufferPool`.

I will submit the modifications later today.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143416#comment-16143416
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , thank you for giving this discussion and comments!

Actually I proposed the same way of using fixed-size `LocalBufferPool` for 
managing exclusive buffers for per `RemoteInputChannel` with stephan before 
implementation. I attached the dialogue below:

![dingtalk20170828140949](https://user-images.githubusercontent.com/12387855/29761347-767c7c84-8bfb-11e7-975b-706265766803.png)

Maybe I did not catch stephan's meaning from the above dialogue and took 
the current way to implement.  I also agree with the way you mentioned and the 
fixed-size buffer pool for `RemoteInputChannel` can be submitted in an separate 
PR.



> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141727#comment-16141727
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4485
  




Reviewed 9 of 13 files at r2.
Review status: all files reviewed at latest revision, 7 unresolved 
discussions, some commit checks failed.

---

*[a 
discussion](https://reviewable.io:443/reviews/apache/flink/4485#-KsOs0jqeqTsAUTwWuFa:-KsOs0jqeqTsAUTwWuFb:b-kg45p7)
 (no related file):*
Depending on how you build on this in the other PRs, what do you think 
about using a fixed-size `LocalBufferPool` (or a customized sub-class) per 
`RemoteInputChannel` instead? This would solve potential issues with recycling 
and would also be a lot less code. Additionally, you will gain the buffer 
availability listener feature so that you will be notified when the buffer is 
released (which may be deep inside other code with no access to the 
`RemoteInputChannel` anymore.

FYI: This change of commits in the PR actually would qualify for a separate 
PR

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java,
 line 216 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOlN5wNvmcco4z2tGj:-KsOlN5xcf-lW0z1FpJo:b-ppkkjd)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java#L216)):*
> ```Java
>   if 
(gate.getConsumedPartitionType().isCreditBased()) {
>   // Create a fix size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
>   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
> ```

we still need to call `gate.setBufferPool(bufferPool)` in order for the 
gate to be aware (this call is common to both paths of the `if`)

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java,
 line 164 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOrPb1_aKuUAa_uyC6:-KsOrPb1_aKuUAa_uyC7:b3045fp)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java#L164)):*
> ```Java
>   }
> 
>   redistributeBuffers();
> ```

now here, you may need to add the try-catch releasing any already added 
segments back (see my comments in `SingleInputGate`

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java,
 line 38 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOhXwT2FF306uRbf5l:-KsOhXwU2y_hAFTh2tTM:b-mb3jxr)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L38)):*
> ```Java
>* no checkpoint barriers.
>*/
>   PIPELINED_BOUNDED(true, true, true, false);
> ```

Does it make sense, to already add an `PIPELINE_CREDIT_BASED(true, true, 
true, true)`? I guess, credit-based can be considered bounded as well

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java,
 line 82 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOi2-hM5IFzcjrGzin:-KsOi2-iXW_-MtjTxono:b-3woyzq)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L82)):*
> ```Java
>   return isBounded;
>   }
> ```

please add a (simple) javadoc similar to the `isBounded()`method

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java,
 line 315 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOoBbKr_kVVDxNIZhm:-KsOoBbKr_kVVDxNIZhn:b85dio4)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L315)):*
> ```Java
>   return segments;
>   } catch (Throwable t) {
>   if (segments != null && segments.size() > 0) {
> ```

Unfortunately, the cleanup will not work as documented - if 
`networkBufferPool.requestMemorySegments(networkBuffersPerChannel);` 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125145#comment-16125145
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates:

- Create the fix size `LocalBufferPool` for floating buffers
- Assign the exclusive buffers for `InputChannel` directly
- The proposed `BufferPoolListener`  will be included in next PR


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)