[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380599#comment-16380599
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5558


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are two scenarios for causing the above issue:
>  * The recycled exclusive buffers increase the total available buffers which 
> is equal to or more than required buffers.
>  * The decreased sender's backlog resulting the available buffers equal to 
> required buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373816#comment-16373816
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5558
  
Thanks for your reviews! I already addressed above comments in a separate 
commit.
Regarding the tests I also think the same with you. :)


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are two scenarios for causing the above issue:
>  * The recycled exclusive buffers increase the total available buffers which 
> is equal to or more than required buffers.
>  * The decreased sender's backlog resulting the available buffers equal to 
> required buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373811#comment-16373811
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5558#discussion_r170148337
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -396,32 +399,49 @@ public void 
testAvailableBuffersLessThanRequiredBuffers() throws Exception {
18, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
// Decrease the backlog
-   inputChannel.onSenderBacklog(15);
+   inputChannel.onSenderBacklog(13);
 
// Only the number of required buffers is changed by 
(backlog + numExclusiveBuffers)
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
assertEquals("There should be 15 buffers available in 
the channel",
15, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 17 buffers required in 
the channel",
-   17, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 15 buffers required in 
the channel",
+   15, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
-   // Recycle one exclusive buffer
-   exclusiveBuffer.recycleBuffer();
+   // Recycle one more floating buffer
+   floatingBufferQueue.poll().recycleBuffer();
 
-   // The exclusive buffer is returned to the channel 
directly
+   // Return the floating buffer to the buffer pool and 
the channel is not waiting for more floating buffers
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 15 buffers available in 
the channel",
+   15, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 15 buffers required in 
the channel",
+   15, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 1 buffers available in 
local pool",
+   1, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertFalse(inputChannel.isWaitingForFloatingBuffers());
+
+   // Increase the backlog again
+   inputChannel.onSenderBacklog(15);
+
+   // The floating buffer is requested from the buffer 
pool and the channel is registered as listener again.
+   verify(bufferPool, times(17)).requestBuffer();
+   verify(bufferPool, 
times(2)).addBufferListener(inputChannel);
assertEquals("There should be 16 buffers available in 
the channel",
16, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 17 buffers required in 
the channel",
17, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffers available in 
local pool",
+   assertEquals("There should be 0 buffer available in 
local pool",
--- End diff --

I also fixed other points with `buffers`.


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> 

[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373809#comment-16373809
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5558#discussion_r170148132
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -337,6 +337,11 @@ public int getSenderBacklog() {
return numRequiredBuffers - initialCredit;
}
 
+   @VisibleForTesting
+   public boolean isWaitingForFloatingBuffers() {
--- End diff --

yes


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are two scenarios for causing the above issue:
>  * The recycled exclusive buffers increase the total available buffers which 
> is equal to or more than required buffers.
>  * The decreased sender's backlog resulting the available buffers equal to 
> required buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373092#comment-16373092
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5558#discussion_r170028724
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -396,32 +399,49 @@ public void 
testAvailableBuffersLessThanRequiredBuffers() throws Exception {
18, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
// Decrease the backlog
-   inputChannel.onSenderBacklog(15);
+   inputChannel.onSenderBacklog(13);
 
// Only the number of required buffers is changed by 
(backlog + numExclusiveBuffers)
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
assertEquals("There should be 15 buffers available in 
the channel",
15, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 17 buffers required in 
the channel",
-   17, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 15 buffers required in 
the channel",
+   15, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
-   // Recycle one exclusive buffer
-   exclusiveBuffer.recycleBuffer();
+   // Recycle one more floating buffer
+   floatingBufferQueue.poll().recycleBuffer();
 
-   // The exclusive buffer is returned to the channel 
directly
+   // Return the floating buffer to the buffer pool and 
the channel is not waiting for more floating buffers
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 15 buffers available in 
the channel",
+   15, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 15 buffers required in 
the channel",
+   15, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 1 buffers available in 
local pool",
+   1, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertFalse(inputChannel.isWaitingForFloatingBuffers());
+
+   // Increase the backlog again
+   inputChannel.onSenderBacklog(15);
+
+   // The floating buffer is requested from the buffer 
pool and the channel is registered as listener again.
+   verify(bufferPool, times(17)).requestBuffer();
+   verify(bufferPool, 
times(2)).addBufferListener(inputChannel);
assertEquals("There should be 16 buffers available in 
the channel",
16, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 17 buffers required in 
the channel",
17, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffers available in 
local pool",
+   assertEquals("There should be 0 buffer available in 
local pool",
--- End diff --

`buffers` is correct here


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required 

[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373093#comment-16373093
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5558#discussion_r170027206
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -337,6 +337,11 @@ public int getSenderBacklog() {
return numRequiredBuffers - initialCredit;
}
 
+   @VisibleForTesting
+   public boolean isWaitingForFloatingBuffers() {
--- End diff --

I think, this could be package-private


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are two scenarios for causing the above issue:
>  * The recycled exclusive buffers increase the total available buffers which 
> is equal to or more than required buffers.
>  * The decreased sender's backlog resulting the available buffers equal to 
> required buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372898#comment-16372898
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5558
  
@NicoK , as I described in the purpose of the change, there are two 
scenarios for this issue. And I only modified the previous 
`RemoteInputChannelTest#testAvailableBuffersLessThanRequiredBuffers` to cover 
one scenario. Do you think it is necessary to cover another scenario by 
recycling exclusive buffers to make available buffers not less than required 
buffers in a new unit test?


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are two scenarios for causing the above issue:
>  * The recycled exclusive buffers increase the total available buffers which 
> is equal to or more than required buffers.
>  * The decreased sender's backlog resulting the available buffers equal to 
> required buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8747) The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372887#comment-16372887
 ] 

ASF GitHub Bot commented on FLINK-8747:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5558

[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly

## What is the purpose of the change

*In credit-based flow control mode, when the number of available buffers is 
less than required buffers (backlog + initialCredit), the `RemoteInputChannel` 
will request floating buffers from `BufferProvider`. If not get enough 
available floating buffers, the `RemoteInputChannel` registers itself as 
listener in `BufferProvider` and updates the tag `isWaitingForFloatingBuffers` 
as true to avoid registration repeatedly.*

 *When a floating buffer is recycled to `BufferProvider`, it will notify 
the listener of available buffer. But the listener may not need floating 
buffers currently if the available buffers is not less than required buffers, 
then the floating buffers will be returned to BufferProvider directly. Most 
importantly, the tag `isWaitingForFloatingBuffers` should also be updated as 
false, otherwise the `RemoteInputChannel` will not request floating buffers any 
more after the available buffers less than required buffers.*

*There are two scenarios for causing the above issue:*

- The recycled exclusive buffers increase the total available buffers which 
is equal to or more than required buffers.
- The decreased sender's backlog resulting the available buffers equal to 
required buffers.

## Brief change log

  - *Updates the tag `isWaitingForFloatingBuffers` false on 
`notifyBufferAvailable` if current available buffers is not less than required 
buffers*

## Verifying this change

This change added tests and can be verified as follows:

  - *Modify 
`RemoteInputChannelTest#testAvailableBuffersLessThanRequiredBuffers to cover 
this case*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8747

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5558.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5558


commit 10ad6db14d072a5208c45c4be08624ebd7e8ea13
Author: Zhijiang 
Date:   2018-02-22T14:41:38Z

[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly




> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> 
>
> Key: FLINK-8747
> URL: https://issues.apache.org/jira/browse/FLINK-8747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are