[jira] [Assigned] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-11-23 Thread Aegeaner (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aegeaner reassigned FLINK-8139:
---

Assignee: Aegeaner

> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264969#comment-16264969
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
@NicoK , thanks for your reviews on Thanksgiving day. 
I have updated the codes addressed your comments. : )


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...

2017-11-23 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
@NicoK , thanks for your reviews on Thanksgiving day. 
I have updated the codes addressed your comments. : )


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264930#comment-16264930
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152899386
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152899386
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, times(13)).r

[jira] [Resolved] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release

2017-11-23 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-7877.
---
Resolution: Cannot Reproduce

The UtilsTest code has been refactored.

> Fix compilation against the Hadoop 3 beta1 release
> --
>
> Key: FLINK-7877
> URL: https://issues.apache.org/jira/browse/FLINK-7877
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>
> When compiling against hadoop 3.0.0-beta1, I got:
> {code}
> [ERROR] 
> /mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16]
>  org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does 
> not override abstract method 
> setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in 
> org.apache.hadoop.yarn.api.records.Container
> {code}
> There may other hadoop API(s) that need adjustment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1

2017-11-23 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7642:
--
Description: 
Surefire 2.19 release introduced more useful test filters which would let us 
run a subset of the test.


This issue is for upgrading maven surefire plugin to 2.19.1

  was:
Surefire 2.19 release introduced more useful test filters which would let us 
run a subset of the test.

This issue is for upgrading maven surefire plugin to 2.19.1


> Upgrade maven surefire plugin to 2.19.1
> ---
>
> Key: FLINK-7642
> URL: https://issues.apache.org/jira/browse/FLINK-7642
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> Surefire 2.19 release introduced more useful test filters which would let us 
> run a subset of the test.
> This issue is for upgrading maven surefire plugin to 2.19.1



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264911#comment-16264911
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152894450
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152894450
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, times(13)).r

[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2017-11-23 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7795:
--
Description: 
http://errorprone.info/ is a tool which detects common coding mistakes.

We should incorporate into Flink build process.
Here are the dependencies:
{code}

  com.google.errorprone
  error_prone_annotation
  ${error-prone.version}
  provided


  
  com.google.auto.service
  auto-service
  1.0-rc3
  true


  com.google.errorprone
  error_prone_check_api
  ${error-prone.version}
  provided
  

  com.google.code.findbugs
  jsr305

  


  com.google.errorprone
  javac
  9-dev-r4023-3
  provided

  
{code}

  was:
http://errorprone.info/ is a tool which detects common coding mistakes.


We should incorporate into Flink build process.


> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264906#comment-16264906
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152893996
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

good point


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152893996
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

good point


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264905#comment-16264905
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152893854
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -475,16 +515,10 @@ public Void call() throws Exception {
};
 
// Submit tasks and wait to finish
-   final List> results = 
Lists.newArrayListWithCapacity(2);
-   results.add(executor.submit(requestBufferTask));
-   results.add(executor.submit(releaseTask));
-   for (Future result : results) {
-   result.get();
-   }
+   submitTasksAndWaitResults(executor, new 
Callable[]{requestBufferTask, releaseTask});
 
assertEquals("There should be no buffers available in 
the channel.",
0, inputChannel.getNumberOfAvailableBuffers());
--- End diff --

yes


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152893854
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -475,16 +515,10 @@ public Void call() throws Exception {
};
 
// Submit tasks and wait to finish
-   final List> results = 
Lists.newArrayListWithCapacity(2);
-   results.add(executor.submit(requestBufferTask));
-   results.add(executor.submit(releaseTask));
-   for (Future result : results) {
-   result.get();
-   }
+   submitTasksAndWaitResults(executor, new 
Callable[]{requestBufferTask, releaseTask});
 
assertEquals("There should be no buffers available in 
the channel.",
0, inputChannel.getNumberOfAvailableBuffers());
--- End diff --

yes


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264832#comment-16264832
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, fixed


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...

2017-11-23 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, fixed


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264825#comment-16264825
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, now I'm using the buffer incorrectly in 
`SpillableSubpartition#add`...let me re-think it once more


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...

2017-11-23 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, now I'm using the buffer incorrectly in 
`SpillableSubpartition#add`...let me re-think it once more


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264718#comment-16264718
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152860980
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -506,36 +540,22 @@ public Void call() throws Exception {
@Test
public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(256, 32, MemoryType.HEAP);
-   final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(248, 32, MemoryType.HEAP);
+   final int numExclusiveSegments = 120;
+   final int numFloatingBuffers = 128;
+   final int backlog = 128;
+
+   final ExecutorService executor = 
Executors.newFixedThreadPool(3);
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);

inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 128;
-   final int numExclusiveSegments = 2;
final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveSegments);
 
-   // Exhaust all the floating buffers
-   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
-   for (int i = 0; i < numFloatingBuffers; i++) {
-   Buffer buffer = bufferPool.requestBuffer();
-   assertNotNull(buffer);
-   floatingBuffers.add(buffer);
-   }
-
-   // Exhaust all the exclusive buffers
-   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
-   for (int i = 0; i < numExclusiveSegments; i++) {
-   Buffer buffer = inputChannel.requestBuffer();
-   assertNotNull(buffer);
-   exclusiveBuffers.add(buffer);
-   }
-
-   final int backlog = 128;
-   final Callable requestBufferTask = new 
Callable() {
+   final Callable requestBufferTask = new Callable() 
{
--- End diff --

please keep `Callable` (or replace by a lambda)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264722#comment-16264722
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152860672
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool
+* in order to maintain backlog + initialCredit buffers available once 
receiving the
+* sender's backlog, and registers as listener if no floating buffers 
available.
 */
@Test
-   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
-   final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   // Setup
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   try {
+   final int numFloatingBuffers = 10;
+   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
+   inputGate.setBufferPool(bufferPool);
+
+   // Assign exclusive segments to the channel
+   final int numExclusiveBuffers = 2;
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+
+   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
+   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
 
-   // Recycle exclusive segment
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   // Receive the producer's backlog
+   inputChannel.onSenderBacklog(8);
 
-   assertEquals("There should be one buffer available after 
recycle.",
-   1, inputChannel.getNumberOfAvailableBuffers());
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
+   verify(bufferPool, times(8)).requestBuffer();
+   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
 
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   inputChannel.onSenderBacklog(11);
 
-   assertEquals("There should be two buffers available after 
recycle.",
-   2, inputChannel.getNumberOfAvailableBuffers());
-   // It should be called only once when increased from zero.
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 12 buffers available in 
the channel",
+   12, inputChannel.getNumberOfAvailableBuffers());
+
+   inputChannel.onSenderBacklog(12);
+
+   // Already in the status of waiting for buffers and 
will not request any more
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+
+   } finally {
+   // Release all the buffer resources
+  

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264721#comment-16264721
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152862469
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -475,16 +515,10 @@ public Void call() throws Exception {
};
 
// Submit tasks and wait to finish
-   final List> results = 
Lists.newArrayListWithCapacity(2);
-   results.add(executor.submit(requestBufferTask));
-   results.add(executor.submit(releaseTask));
-   for (Future result : results) {
-   result.get();
-   }
+   submitTasksAndWaitResults(executor, new 
Callable[]{requestBufferTask, releaseTask});
 
assertEquals("There should be no buffers available in 
the channel.",
0, inputChannel.getNumberOfAvailableBuffers());
--- End diff --

please add:
```
assertEquals("There should be 130 buffers available in 
local pool.",
130, 
bufferPool.getNumberOfAvailableMemorySegments() + 
networkBufferPool.getNumberOfAvailableMemorySegments());
```
(buffers could be in either buffer pool depending on whether they were 
requested at least once from the `LocalBufferPool` or not)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264719#comment-16264719
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152861029
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -440,21 +476,25 @@ public void testFairDistributionFloatingBuffers() 
throws Exception {
@Test
public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(256, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(130, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 128;
+
final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);

inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final BufferPool bufferPool = 
networkBufferPool.createBufferPool(128, 128);
+   final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
-   inputGate.assignExclusiveSegments(networkBufferPool, 2);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   final Callable requestBufferTask = new 
Callable() {
+   final Callable requestBufferTask = new Callable() 
{
--- End diff --

please keep `Callable` (or replace by a lambda)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264720#comment-16264720
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152861040
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -465,7 +505,7 @@ public Void call() throws Exception {
}
};
 
-   final Callable releaseTask = new Callable() 
{
+   final Callable releaseTask = new Callable() {
--- End diff --

please keep `Callable` (or replace by a lambda)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152860672
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool
+* in order to maintain backlog + initialCredit buffers available once 
receiving the
+* sender's backlog, and registers as listener if no floating buffers 
available.
 */
@Test
-   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
-   final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   // Setup
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   try {
+   final int numFloatingBuffers = 10;
+   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
+   inputGate.setBufferPool(bufferPool);
+
+   // Assign exclusive segments to the channel
+   final int numExclusiveBuffers = 2;
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+
+   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
+   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
 
-   // Recycle exclusive segment
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   // Receive the producer's backlog
+   inputChannel.onSenderBacklog(8);
 
-   assertEquals("There should be one buffer available after 
recycle.",
-   1, inputChannel.getNumberOfAvailableBuffers());
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
+   verify(bufferPool, times(8)).requestBuffer();
+   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
 
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   inputChannel.onSenderBacklog(11);
 
-   assertEquals("There should be two buffers available after 
recycle.",
-   2, inputChannel.getNumberOfAvailableBuffers());
-   // It should be called only once when increased from zero.
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 12 buffers available in 
the channel",
+   12, inputChannel.getNumberOfAvailableBuffers());
+
+   inputChannel.onSenderBacklog(12);
+
+   // Already in the status of waiting for buffers and 
will not request any more
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+
+   } finally {
+   // Release all the buffer resources
+   inputChannel.releaseAllResources();
+
+   networkBufferPool.destroyAllBufferPools();
+   networkBufferPool.destroy();
+   }
}
 
/**
-* Tests {@link RemoteInputChannel#recy

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152861040
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -465,7 +505,7 @@ public Void call() throws Exception {
}
};
 
-   final Callable releaseTask = new Callable() 
{
+   final Callable releaseTask = new Callable() {
--- End diff --

please keep `Callable` (or replace by a lambda)


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152862469
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -475,16 +515,10 @@ public Void call() throws Exception {
};
 
// Submit tasks and wait to finish
-   final List> results = 
Lists.newArrayListWithCapacity(2);
-   results.add(executor.submit(requestBufferTask));
-   results.add(executor.submit(releaseTask));
-   for (Future result : results) {
-   result.get();
-   }
+   submitTasksAndWaitResults(executor, new 
Callable[]{requestBufferTask, releaseTask});
 
assertEquals("There should be no buffers available in 
the channel.",
0, inputChannel.getNumberOfAvailableBuffers());
--- End diff --

please add:
```
assertEquals("There should be 130 buffers available in 
local pool.",
130, 
bufferPool.getNumberOfAvailableMemorySegments() + 
networkBufferPool.getNumberOfAvailableMemorySegments());
```
(buffers could be in either buffer pool depending on whether they were 
requested at least once from the `LocalBufferPool` or not)


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152860980
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -506,36 +540,22 @@ public Void call() throws Exception {
@Test
public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(256, 32, MemoryType.HEAP);
-   final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(248, 32, MemoryType.HEAP);
+   final int numExclusiveSegments = 120;
+   final int numFloatingBuffers = 128;
+   final int backlog = 128;
+
+   final ExecutorService executor = 
Executors.newFixedThreadPool(3);
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);

inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 128;
-   final int numExclusiveSegments = 2;
final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveSegments);
 
-   // Exhaust all the floating buffers
-   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
-   for (int i = 0; i < numFloatingBuffers; i++) {
-   Buffer buffer = bufferPool.requestBuffer();
-   assertNotNull(buffer);
-   floatingBuffers.add(buffer);
-   }
-
-   // Exhaust all the exclusive buffers
-   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
-   for (int i = 0; i < numExclusiveSegments; i++) {
-   Buffer buffer = inputChannel.requestBuffer();
-   assertNotNull(buffer);
-   exclusiveBuffers.add(buffer);
-   }
-
-   final int backlog = 128;
-   final Callable requestBufferTask = new 
Callable() {
+   final Callable requestBufferTask = new Callable() 
{
--- End diff --

please keep `Callable` (or replace by a lambda)


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152861029
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -440,21 +476,25 @@ public void testFairDistributionFloatingBuffers() 
throws Exception {
@Test
public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(256, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(130, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 128;
+
final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);

inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final BufferPool bufferPool = 
networkBufferPool.createBufferPool(128, 128);
+   final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
-   inputGate.assignExclusiveSegments(networkBufferPool, 2);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   final Callable requestBufferTask = new 
Callable() {
+   final Callable requestBufferTask = new Callable() 
{
--- End diff --

please keep `Callable` (or replace by a lambda)


---


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264709#comment-16264709
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4757
  
Hi @GJL , you can take over this PR


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHandler to ...

2017-11-23 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4757
  
Hi @GJL , you can take over this PR


---


[jira] [Commented] (FLINK-6505) Proactively cleanup local FS for RocksDBKeyedStateBackend on startup

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264706#comment-16264706
 ] 

ASF GitHub Bot commented on FLINK-6505:
---

Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4798


> Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
> 
>
> Key: FLINK-6505
> URL: https://issues.apache.org/jira/browse/FLINK-6505
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.5.0
>
>
> In {{RocksDBKeyedStateBackend}}, the {{instanceBasePath}} is cleared on 
> {{dispose()}}. I think it might make sense to also clear this directory when 
> the backend is created, in case something crashed and the backend never 
> reached {{dispose()}}. At least for previous runs of the same job, we can 
> know what to delete on restart. 
> In general, it is very important for this backend to clean up the local FS, 
> because the local quota might be very limited compared to the DFS. And a node 
> that runs out of local disk space can bring down the whole job, with no way 
> to recover (it might always get rescheduled to that node).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4798: [FLINK-6505] Proactively cleanup local FS for Rock...

2017-11-23 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4798


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264697#comment-16264697
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152854762
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as lis

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264696#comment-16264696
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152860104
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool
+* in order to maintain backlog + initialCredit buffers available once 
receiving the
+* sender's backlog, and registers as listener if no floating buffers 
available.
 */
@Test
-   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
-   final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   // Setup
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   try {
+   final int numFloatingBuffers = 10;
+   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
+   inputGate.setBufferPool(bufferPool);
+
+   // Assign exclusive segments to the channel
+   final int numExclusiveBuffers = 2;
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+
+   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
+   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
 
-   // Recycle exclusive segment
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   // Receive the producer's backlog
+   inputChannel.onSenderBacklog(8);
 
-   assertEquals("There should be one buffer available after 
recycle.",
-   1, inputChannel.getNumberOfAvailableBuffers());
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
+   verify(bufferPool, times(8)).requestBuffer();
+   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
 
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   inputChannel.onSenderBacklog(11);
 
-   assertEquals("There should be two buffers available after 
recycle.",
-   2, inputChannel.getNumberOfAvailableBuffers());
-   // It should be called only once when increased from zero.
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 12 buffers available in 
the channel",
+   12, inputChannel.getNumberOfAvailableBuffers());
+
+   inputChannel.onSenderBacklog(12);
+
+   // Already in the status of waiting for buffers and 
will not request any more
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+
+   } finally {
+   // Release all the buffer resources
+  

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264701#comment-16264701
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152855823
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
--- End diff --

add note that one exclusive buffer is taken (and therefore 11 requests and 
not 10)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInput

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859769
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
--- End diff --

please add a Javadoc


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264692#comment-16264692
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152836741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
+   exclusiveBuffers.add(buffer);
+
+   if (getAvailableBufferSize() > numRequiredBuffers) {
+   Buffer floatingBuffer = floatingBuffers.poll();
+   floatingBuffer.recycle();
+   return true;
+   } else {
+   return false;
+   }
}
 
-   int getFloatingBufferSize() {
-   return floatingBuffers.size();
+   /**
+* Take the floating buffer first if possible.
+*/
+   @Nullable
+   Buffer takeBuffer() {
+   if (floatingBuffers.size() > 0) {
+   return floatingBuffers.poll();
+   } else {
+   return exclusiveBuffers.poll();
+   }
+   }
+
+   /**
+* The floating buffer is recycled to local buffer pool 
directly, and the
+* exclusive buffer will be gathered to return to global buffer 
pool later.
+*/
+   void releaseAll(List exclusiveSegments) {
--- End diff --

please document the `exclusiveSegments` parameter to make it absolutely 
clear for the user that we will add memory segments of exclusive buffers


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152858602
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, times(13)).reque

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152836741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
+   exclusiveBuffers.add(buffer);
+
+   if (getAvailableBufferSize() > numRequiredBuffers) {
+   Buffer floatingBuffer = floatingBuffers.poll();
+   floatingBuffer.recycle();
+   return true;
+   } else {
+   return false;
+   }
}
 
-   int getFloatingBufferSize() {
-   return floatingBuffers.size();
+   /**
+* Take the floating buffer first if possible.
+*/
+   @Nullable
+   Buffer takeBuffer() {
+   if (floatingBuffers.size() > 0) {
+   return floatingBuffers.poll();
+   } else {
+   return exclusiveBuffers.poll();
+   }
+   }
+
+   /**
+* The floating buffer is recycled to local buffer pool 
directly, and the
+* exclusive buffer will be gathered to return to global buffer 
pool later.
+*/
+   void releaseAll(List exclusiveSegments) {
--- End diff --

please document the `exclusiveSegments` parameter to make it absolutely 
clear for the user that we will add memory segments of exclusive buffers


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264699#comment-16264699
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859784
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
+   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   // Exhaust all the exclusive buffers
+   for (int i = 0; i < numExclusiveSegments; i++) {
+   Buffer buffer = inputChannel.requestBuffer();
+   assertNotNull(buffer);
+   exclusiveBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : exclusiveBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private Callable recycleFloatingBufferTask(BufferPool bufferPool, int 
numFloatingBuffers) throws Exception {
+   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
+   // Exhaust all the floating buffers
+   for (int i = 0; i < numFloatingBuffers; i++) {
+   Buffer buffer = bufferPool.requestBuffer();
+   assertNotNull(buffer);
+   floatingBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : floatingBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private void submitTasksAndWaitResults(ExecutorService executor, 
Callable[] tasks) throws Exception {
--- End diff --

please add a Javadoc


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264695#comment-16264695
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152853208
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

actually, this is now offering two functionalities:
```
addExclusiveBuffer(buffer);
maintainTargetSize(numRequiredBuffers);
```
I'd suggest to either use the two separately or integrate the target size 
maintaining into `addExclusiveBuffers`. In any case, you should make 
`AvailableBufferQueue` a `static` inner class which could then also be tested 
separately.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264691#comment-16264691
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859775
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
+   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   // Exhaust all the exclusive buffers
+   for (int i = 0; i < numExclusiveSegments; i++) {
+   Buffer buffer = inputChannel.requestBuffer();
+   assertNotNull(buffer);
+   exclusiveBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : exclusiveBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private Callable recycleFloatingBufferTask(BufferPool bufferPool, int 
numFloatingBuffers) throws Exception {
+   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
--- End diff --

please add a Javadoc


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152856498
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, times(13)).reque

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152855823
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
--- End diff --

add note that one exclusive buffer is taken (and therefore 11 requests and 
not 10)


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859775
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
+   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   // Exhaust all the exclusive buffers
+   for (int i = 0; i < numExclusiveSegments; i++) {
+   Buffer buffer = inputChannel.requestBuffer();
+   assertNotNull(buffer);
+   exclusiveBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : exclusiveBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private Callable recycleFloatingBufferTask(BufferPool bufferPool, int 
numFloatingBuffers) throws Exception {
+   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
--- End diff --

please add a Javadoc


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264690#comment-16264690
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152852133
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
+   exclusiveBuffers.add(buffer);
+
+   if (getAvailableBufferSize() > numRequiredBuffers) {
+   Buffer floatingBuffer = floatingBuffers.poll();
+   floatingBuffer.recycle();
+   return true;
+   } else {
+   return false;
+   }
}
 
-   int getFloatingBufferSize() {
-   return floatingBuffers.size();
+   /**
+* Take the floating buffer first if possible.
+*/
+   @Nullable
+   Buffer takeBuffer() {
--- End diff --

please explain when the result may be `null`


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152853208
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

actually, this is now offering two functionalities:
```
addExclusiveBuffer(buffer);
maintainTargetSize(numRequiredBuffers);
```
I'd suggest to either use the two separately or integrate the target size 
maintaining into `addExclusiveBuffers`. In any case, you should make 
`AvailableBufferQueue` a `static` inner class which could then also be tested 
separately.


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264698#comment-16264698
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859769
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
--- End diff --

please add a Javadoc


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859069
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool
+* in order to maintain backlog + initialCredit buffers available once 
receiving the
+* sender's backlog, and registers as listener if no floating buffers 
available.
 */
@Test
-   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
-   final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   // Setup
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   try {
+   final int numFloatingBuffers = 10;
+   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
+   inputGate.setBufferPool(bufferPool);
+
+   // Assign exclusive segments to the channel
+   final int numExclusiveBuffers = 2;
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+
+   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
+   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
 
-   // Recycle exclusive segment
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   // Receive the producer's backlog
+   inputChannel.onSenderBacklog(8);
 
-   assertEquals("There should be one buffer available after 
recycle.",
-   1, inputChannel.getNumberOfAvailableBuffers());
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
+   verify(bufferPool, times(8)).requestBuffer();
+   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
 
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   inputChannel.onSenderBacklog(11);
 
-   assertEquals("There should be two buffers available after 
recycle.",
-   2, inputChannel.getNumberOfAvailableBuffers());
-   // It should be called only once when increased from zero.
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 12 buffers available in 
the channel",
+   12, inputChannel.getNumberOfAvailableBuffers());
+
+   inputChannel.onSenderBacklog(12);
+
+   // Already in the status of waiting for buffers and 
will not request any more
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+
--- End diff --

Thanks, the extensions you implemented for 
`testRequestFloatingBufferOnSenderBacklog()` were exactly what I missed 
previously and was not covered by `testFairDistributionFloatingBuffers()`. 
Based on your additions, I came up with some more (corner) cases that should be 
covered (see above)


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264689#comment-16264689
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152856498
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as lis

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152857580
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, times(13)).reque

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264694#comment-16264694
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152858602
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as lis

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264700#comment-16264700
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152857580
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as lis

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152852133
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
+   exclusiveBuffers.add(buffer);
+
+   if (getAvailableBufferSize() > numRequiredBuffers) {
+   Buffer floatingBuffer = floatingBuffers.poll();
+   floatingBuffer.recycle();
+   return true;
+   } else {
+   return false;
+   }
}
 
-   int getFloatingBufferSize() {
-   return floatingBuffers.size();
+   /**
+* Take the floating buffer first if possible.
+*/
+   @Nullable
+   Buffer takeBuffer() {
--- End diff --

please explain when the result may be `null`


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152854762
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, times(13)).reque

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264693#comment-16264693
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859069
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool
+* in order to maintain backlog + initialCredit buffers available once 
receiving the
+* sender's backlog, and registers as listener if no floating buffers 
available.
 */
@Test
-   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
-   final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   // Setup
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   try {
+   final int numFloatingBuffers = 10;
+   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
+   inputGate.setBufferPool(bufferPool);
+
+   // Assign exclusive segments to the channel
+   final int numExclusiveBuffers = 2;
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+
+   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
+   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
 
-   // Recycle exclusive segment
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   // Receive the producer's backlog
+   inputChannel.onSenderBacklog(8);
 
-   assertEquals("There should be one buffer available after 
recycle.",
-   1, inputChannel.getNumberOfAvailableBuffers());
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
+   verify(bufferPool, times(8)).requestBuffer();
+   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
 
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   inputChannel.onSenderBacklog(11);
 
-   assertEquals("There should be two buffers available after 
recycle.",
-   2, inputChannel.getNumberOfAvailableBuffers());
-   // It should be called only once when increased from zero.
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 12 buffers available in 
the channel",
+   12, inputChannel.getNumberOfAvailableBuffers());
+
+   inputChannel.onSenderBacklog(12);
+
+   // Already in the status of waiting for buffers and 
will not request any more
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+
--- End diff --

Thanks, the extensions you implemented for 
`testRequestFloatingBufferOnSenderBack

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152859784
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
+   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   // Exhaust all the exclusive buffers
+   for (int i = 0; i < numExclusiveSegments; i++) {
+   Buffer buffer = inputChannel.requestBuffer();
+   assertNotNull(buffer);
+   exclusiveBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : exclusiveBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private Callable recycleFloatingBufferTask(BufferPool bufferPool, int 
numFloatingBuffers) throws Exception {
+   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
+   // Exhaust all the floating buffers
+   for (int i = 0; i < numFloatingBuffers; i++) {
+   Buffer buffer = bufferPool.requestBuffer();
+   assertNotNull(buffer);
+   floatingBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : floatingBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private void submitTasksAndWaitResults(ExecutorService executor, 
Callable[] tasks) throws Exception {
--- End diff --

please add a Javadoc


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152860104
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool
+* in order to maintain backlog + initialCredit buffers available once 
receiving the
+* sender's backlog, and registers as listener if no floating buffers 
available.
 */
@Test
-   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
-   final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   // Setup
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   try {
+   final int numFloatingBuffers = 10;
+   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
+   inputGate.setBufferPool(bufferPool);
+
+   // Assign exclusive segments to the channel
+   final int numExclusiveBuffers = 2;
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+
+   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
+   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
 
-   // Recycle exclusive segment
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   // Receive the producer's backlog
+   inputChannel.onSenderBacklog(8);
 
-   assertEquals("There should be one buffer available after 
recycle.",
-   1, inputChannel.getNumberOfAvailableBuffers());
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
+   verify(bufferPool, times(8)).requestBuffer();
+   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
 
-   
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+   inputChannel.onSenderBacklog(11);
 
-   assertEquals("There should be two buffers available after 
recycle.",
-   2, inputChannel.getNumberOfAvailableBuffers());
-   // It should be called only once when increased from zero.
-   verify(inputChannel, times(1)).notifyCreditAvailable();
+   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 12 buffers available in 
the channel",
+   12, inputChannel.getNumberOfAvailableBuffers());
+
+   inputChannel.onSenderBacklog(12);
+
+   // Already in the status of waiting for buffers and 
will not request any more
+   verify(bufferPool, times(11)).requestBuffer();
+   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+
+   } finally {
+   // Release all the buffer resources
+   inputChannel.releaseAllResources();
+
+   networkBufferPool.destroyAllBufferPools();
+   networkBufferPool.destroy();
+   }
}
 
/**
-* Tests {@link RemoteInputChannel#recy

[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264667#comment-16264667
 ] 

ASF GitHub Bot commented on FLINK-8125:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5059
  
Addressed the comment.


> Support limiting the number of open FileSystem connections
> --
>
> Key: FLINK-8125
> URL: https://issues.apache.org/jira/browse/FLINK-8125
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> We need a way to limit the number of streams that Flink FileSystems 
> concurrently open.
> For example, for very small HDFS clusters with few RPC handlers, a large 
> Flink job trying to build up many connections during a checkpoint causes 
> failures due to rejected connections. 
> I propose to add a file system that can wrap another existing file system The 
> file system may track the progress of streams and close streams that have 
> been inactive for too long, to avoid locked streams of taking up the complete 
> pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5059: [FLINK-8125] [core] Introduce limiting of file system con...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5059
  
Addressed the comment.


---


[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264664#comment-16264664
 ] 

ASF GitHub Bot commented on FLINK-8125:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152856201
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private fina

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152856201
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   @G

[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264659#comment-16264659
 ] 

ASF GitHub Bot commented on FLINK-8125:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152854733
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private fina

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152854733
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   @G

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152854163
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   @G

[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264655#comment-16264655
 ] 

ASF GitHub Bot commented on FLINK-8125:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152854163
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private fina

[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264652#comment-16264652
 ] 

ASF GitHub Bot commented on FLINK-8125:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152853859
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private fina

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152853859
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   @G

[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264638#comment-16264638
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
alright - using fixup commits now for you ;)
FYI: all `[FLINK-7499]` belong together and can be squashed


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...

2017-11-23 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
alright - using fixup commits now for you ;)
FYI: all `[FLINK-7499]` belong together and can be squashed


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264633#comment-16264633
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152850290
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

In theory, you are right - in practice though (and that's what I tried 
first), `AsynchronousBufferFileWriter` cannot be extended from anywhere outside 
its package due to `WriteRequest` being package-private. Since the writer 
implementations I needed are not too generic, I did not want to promote them to 
this package (in the tests folder, of course) nor did I want to make 
`WriteRequest` public...hence Mockito.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152850290
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

In theory, you are right - in practice though (and that's what I tried 
first), `AsynchronousBufferFileWriter` cannot be extended from anywhere outside 
its package due to `WriteRequest` being package-private. Since the writer 
implementations I needed are not too generic, I did not want to promote them to 
this package (in the tests folder, of course) nor did I want to make 
`WriteRequest` public...hence Mockito.


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264629#comment-16264629
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152849820
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+   assertEquals(0, partition.releaseMemory());
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+* write request fails with an exception.
+*/
+   @Test
+   public void testAddOnSpilledPartitionW

[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152849820
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+   assertEquals(0, partition.releaseMemory());
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+* write request fails with an exception.
+*/
+   @Test
+   public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
+   IOManager ioManager = new 
IOManagerAsyncWithStallingBufferFileWriter();
+   SpillableSubpartition partition = createSubpartition(ioManager);
+   assertEquals(0, partition.releaseMe

[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264620#comment-16264620
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152849113
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

What do you mean? What failure / normal path? `partition.add()` should 
always succeed in this case, i.e. it does not throw.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152849113
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

What do you mean? What failure / normal path? `partition.add()` should 
always succeed in this case, i.e. it does not throw.


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264605#comment-16264605
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152847273
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
--- End diff --

you're right - some tests actually share most of their code. I'll extract a 
common test method to reduce some code.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152847273
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
--- End diff --

you're right - some tests actually share most of their code. I'll extract a 
common test method to reduce some code.


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264593#comment-16264593
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152845922
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ---
@@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testAddOnPipelinedPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.PIPELINED);
+   }
+
+   @Test
+   public void testAddOnBlockingPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.BLOCKING);
+   }
+
+   /**
+* Tests {@link ResultPartition#add} on a working partition.
+*
+* @param pipelined the result partition type to set up
+*/
+   protected void testAddOnPartition(final ResultPartitionType pipelined)
+   throws Exception {
+   ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
--- End diff --

do you think it's better to have a real implementation of that interface, 
spy on it, and then verify the expected method calls? - this actually seems 
like some more overhead with little/no gain. I'd prefer to leave it as is for 
now.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152845922
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ---
@@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testAddOnPipelinedPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.PIPELINED);
+   }
+
+   @Test
+   public void testAddOnBlockingPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.BLOCKING);
+   }
+
+   /**
+* Tests {@link ResultPartition#add} on a working partition.
+*
+* @param pipelined the result partition type to set up
+*/
+   protected void testAddOnPartition(final ResultPartitionType pipelined)
+   throws Exception {
+   ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
--- End diff --

do you think it's better to have a real implementation of that interface, 
spy on it, and then verify the expected method calls? - this actually seems 
like some more overhead with little/no gain. I'd prefer to leave it as is for 
now.


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264579#comment-16264579
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152842686
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException {
return false;
}
 
-   // The number of buffers are needed later when creating
-   // the read views. If you ever remove this line here,
-   // make sure to still count the number of buffers.
-   updateStatistics(buffer);
-
if (spillWriter == null) {
buffers.add(buffer);
+   // The number of buffers are needed later when 
creating
+   // the read views. If you ever remove this line 
here,
+   // make sure to still count the number of 
buffers.
--- End diff --

yes, all my new tests verify these numbers via
```
assertEquals(..., partition.getTotalNumberOfBuffers());
assertEquals(..., partition.getTotalNumberOfBytes());
```
but I'll add some more of those checks to some existing tests


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152842686
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException {
return false;
}
 
-   // The number of buffers are needed later when creating
-   // the read views. If you ever remove this line here,
-   // make sure to still count the number of buffers.
-   updateStatistics(buffer);
-
if (spillWriter == null) {
buffers.add(buffer);
+   // The number of buffers are needed later when 
creating
+   // the read views. If you ever remove this line 
here,
+   // make sure to still count the number of 
buffers.
--- End diff --

yes, all my new tests verify these numbers via
```
assertEquals(..., partition.getTotalNumberOfBuffers());
assertEquals(..., partition.getTotalNumberOfBytes());
```
but I'll add some more of those checks to some existing tests


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264573#comment-16264573
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152841030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() {
 * first buffer has been added.
 */
public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
-   boolean success = false;
+   checkNotNull(buffer);
 
try {
checkInProduceState();
+   } catch (Throwable t) {
+   buffer.recycle();
--- End diff --

Actually, a sanity check for double-recycle comes with #4613 for which I 
also needed this PR. It does, however, work differently and only checks that 
the reference counter does not go below 0 - I guess, this way we do not put too 
much pressure on the garbage collector compared to creating new Buffer 
instances for each `retain()`


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152841030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() {
 * first buffer has been added.
 */
public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
-   boolean success = false;
+   checkNotNull(buffer);
 
try {
checkInProduceState();
+   } catch (Throwable t) {
+   buffer.recycle();
--- End diff --

Actually, a sanity check for double-recycle comes with #4613 for which I 
also needed this PR. It does, however, work differently and only checks that 
the reference counter does not go below 0 - I guess, this way we do not put too 
much pressure on the garbage collector compared to creating new Buffer 
instances for each `retain()`


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264559#comment-16264559
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152829734
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

again, why mockito? It's terrible for debugging and completely fails with 
refactoring (like added new overloaded method 
`BlockChannelWriterWithCallback::writeBlock` or changing signatures). 
Especially that at the same time overloading returning here anonymous class 
overloading `AsynchronousBufferFileWriter`, that replaces `writeBlock` with 
empty method is just as easy :(


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152829734
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

again, why mockito? It's terrible for debugging and completely fails with 
refactoring (like added new overloaded method 
`BlockChannelWriterWithCallback::writeBlock` or changing signatures). 
Especially that at the same time overloading returning here anonymous class 
overloading `AsynchronousBufferFileWriter`, that replaces `writeBlock` with 
empty method is just as easy :(


---


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152822778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException {
return false;
}
 
-   // The number of buffers are needed later when creating
-   // the read views. If you ever remove this line here,
-   // make sure to still count the number of buffers.
-   updateStatistics(buffer);
-
if (spillWriter == null) {
buffers.add(buffer);
+   // The number of buffers are needed later when 
creating
+   // the read views. If you ever remove this line 
here,
+   // make sure to still count the number of 
buffers.
--- End diff --

Is it tested somewhere? 


---


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826013
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ---
@@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testAddOnPipelinedPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.PIPELINED);
+   }
+
+   @Test
+   public void testAddOnBlockingPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.BLOCKING);
+   }
+
+   /**
+* Tests {@link ResultPartition#add} on a working partition.
+*
+* @param pipelined the result partition type to set up
+*/
+   protected void testAddOnPartition(final ResultPartitionType pipelined)
+   throws Exception {
+   ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
--- End diff --

https://imgflip.com/i/1zvsnt :(


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264557#comment-16264557
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826521
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

? Why does this test covers for both failure and normal paths? What if one 
of them never happen?


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826521
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

? Why does this test covers for both failure and normal paths? What if one 
of them never happen?


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264553#comment-16264553
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826839
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

ditto (and below)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264552#comment-16264552
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826013
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ---
@@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testAddOnPipelinedPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.PIPELINED);
+   }
+
+   @Test
+   public void testAddOnBlockingPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.BLOCKING);
+   }
+
+   /**
+* Tests {@link ResultPartition#add} on a working partition.
+*
+* @param pipelined the result partition type to set up
+*/
+   protected void testAddOnPartition(final ResultPartitionType pipelined)
+   throws Exception {
+   ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
--- End diff --

https://imgflip.com/i/1zvsnt :(


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264558#comment-16264558
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152837725
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
--- End diff --

could some of those tests be squashed into fewer methods? Or you think that 
wouldn't be a good idea?


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152825430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() {
 * first buffer has been added.
 */
public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
-   boolean success = false;
+   checkNotNull(buffer);
 
try {
checkInProduceState();
+   } catch (Throwable t) {
+   buffer.recycle();
--- End diff --

I wonder if we should have some sanity illegal state detection for double 
recycling the buffers. For example each buffer could only be recycled once 
(protected by a private field in the Buffer `boolean wasRecycled`). Whenever 
you call `retain()`, you would get a new instance of the `Buffer`, pointing to 
the same memory, but with new flag (so that both original and retained buffers 
could be recycled independently, but each one of them only once).


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264555#comment-16264555
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152827018
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+   assertEquals(0, partition.releaseMemory());
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+* write request fails with an exception.
+*/
+   @Test
+   public void testAddOnSpilledPartit

[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152827018
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+   assertEquals(0, partition.releaseMemory());
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+* write request fails with an exception.
+*/
+   @Test
+   public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
+   IOManager ioManager = new 
IOManagerAsyncWithStallingBufferFileWriter();
+   SpillableSubpartition partition = createSubpartition(ioManager);
+   assertEquals(0, partition.relea

[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826839
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

ditto (and below)


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264556#comment-16264556
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152822778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException {
return false;
}
 
-   // The number of buffers are needed later when creating
-   // the read views. If you ever remove this line here,
-   // make sure to still count the number of buffers.
-   updateStatistics(buffer);
-
if (spillWriter == null) {
buffers.add(buffer);
+   // The number of buffers are needed later when 
creating
+   // the read views. If you ever remove this line 
here,
+   // make sure to still count the number of 
buffers.
--- End diff --

Is it tested somewhere? 


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264554#comment-16264554
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152825430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() {
 * first buffer has been added.
 */
public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
-   boolean success = false;
+   checkNotNull(buffer);
 
try {
checkInProduceState();
+   } catch (Throwable t) {
+   buffer.recycle();
--- End diff --

I wonder if we should have some sanity illegal state detection for double 
recycling the buffers. For example each buffer could only be recycled once 
(protected by a private field in the Buffer `boolean wasRecycled`). Whenever 
you call `retain()`, you would get a new instance of the `Buffer`, pointing to 
the same memory, but with new flag (so that both original and retained buffers 
could be recycled independently, but each one of them only once).


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152837725
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
--- End diff --

could some of those tests be squashed into fewer methods? Or you think that 
wouldn't be a good idea?


---


  1   2   3   4   >