[jira] [Assigned] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aegeaner reassigned FLINK-8139: --- Assignee: Aegeaner > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264969#comment-16264969 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4509 @NicoK , thanks for your reviews on Thanksgiving day. I have updated the codes addressed your comments. : ) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4509 @NicoK , thanks for your reviews on Thanksgiving day. I have updated the codes addressed your comments. : ) ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264930#comment-16264930 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152899386 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152899386 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).r
[jira] [Resolved] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release
[ https://issues.apache.org/jira/browse/FLINK-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-7877. --- Resolution: Cannot Reproduce The UtilsTest code has been refactored. > Fix compilation against the Hadoop 3 beta1 release > -- > > Key: FLINK-7877 > URL: https://issues.apache.org/jira/browse/FLINK-7877 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu > > When compiling against hadoop 3.0.0-beta1, I got: > {code} > [ERROR] > /mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16] > org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does > not override abstract method > setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in > org.apache.hadoop.yarn.api.records.Container > {code} > There may other hadoop API(s) that need adjustment. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1
[ https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7642: -- Description: Surefire 2.19 release introduced more useful test filters which would let us run a subset of the test. This issue is for upgrading maven surefire plugin to 2.19.1 was: Surefire 2.19 release introduced more useful test filters which would let us run a subset of the test. This issue is for upgrading maven surefire plugin to 2.19.1 > Upgrade maven surefire plugin to 2.19.1 > --- > > Key: FLINK-7642 > URL: https://issues.apache.org/jira/browse/FLINK-7642 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > Surefire 2.19 release introduced more useful test filters which would let us > run a subset of the test. > This issue is for upgrading maven surefire plugin to 2.19.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264911#comment-16264911 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152894450 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152894450 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).r
[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7795: -- Description: http://errorprone.info/ is a tool which detects common coding mistakes. We should incorporate into Flink build process. Here are the dependencies: {code} com.google.errorprone error_prone_annotation ${error-prone.version} provided com.google.auto.service auto-service 1.0-rc3 true com.google.errorprone error_prone_check_api ${error-prone.version} provided com.google.code.findbugs jsr305 com.google.errorprone javac 9-dev-r4023-3 provided {code} was: http://errorprone.info/ is a tool which detects common coding mistakes. We should incorporate into Flink build process. > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. > Here are the dependencies: > {code} > > com.google.errorprone > error_prone_annotation > ${error-prone.version} > provided > > > > com.google.auto.service > auto-service > 1.0-rc3 > true > > > com.google.errorprone > error_prone_check_api > ${error-prone.version} > provided > > > com.google.code.findbugs > jsr305 > > > > > com.google.errorprone > javac > 9-dev-r4023-3 > provided > > > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264906#comment-16264906 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152893996 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- good point > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152893996 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- good point ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264905#comment-16264905 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152893854 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -475,16 +515,10 @@ public Void call() throws Exception { }; // Submit tasks and wait to finish - final List> results = Lists.newArrayListWithCapacity(2); - results.add(executor.submit(requestBufferTask)); - results.add(executor.submit(releaseTask)); - for (Future result : results) { - result.get(); - } + submitTasksAndWaitResults(executor, new Callable[]{requestBufferTask, releaseTask}); assertEquals("There should be no buffers available in the channel.", 0, inputChannel.getNumberOfAvailableBuffers()); --- End diff -- yes > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152893854 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -475,16 +515,10 @@ public Void call() throws Exception { }; // Submit tasks and wait to finish - final List> results = Lists.newArrayListWithCapacity(2); - results.add(executor.submit(requestBufferTask)); - results.add(executor.submit(releaseTask)); - for (Future result : results) { - result.get(); - } + submitTasksAndWaitResults(executor, new Callable[]{requestBufferTask, releaseTask}); assertEquals("There should be no buffers available in the channel.", 0, inputChannel.getNumberOfAvailableBuffers()); --- End diff -- yes ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264832#comment-16264832 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, fixed > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, fixed ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264825#comment-16264825 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, now I'm using the buffer incorrectly in `SpillableSubpartition#add`...let me re-think it once more > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, now I'm using the buffer incorrectly in `SpillableSubpartition#add`...let me re-think it once more ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264718#comment-16264718 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152860980 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -506,36 +540,22 @@ public Void call() throws Exception { @Test public void testConcurrentOnSenderBacklogAndRecycle() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(256, 32, MemoryType.HEAP); - final ExecutorService executor = Executors.newFixedThreadPool(2); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32, MemoryType.HEAP); + final int numExclusiveSegments = 120; + final int numFloatingBuffers = 128; + final int backlog = 128; + + final ExecutorService executor = Executors.newFixedThreadPool(3); + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 128; - final int numExclusiveSegments = 2; final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveSegments); - // Exhaust all the floating buffers - final List floatingBuffers = new ArrayList<>(numFloatingBuffers); - for (int i = 0; i < numFloatingBuffers; i++) { - Buffer buffer = bufferPool.requestBuffer(); - assertNotNull(buffer); - floatingBuffers.add(buffer); - } - - // Exhaust all the exclusive buffers - final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); - for (int i = 0; i < numExclusiveSegments; i++) { - Buffer buffer = inputChannel.requestBuffer(); - assertNotNull(buffer); - exclusiveBuffers.add(buffer); - } - - final int backlog = 128; - final Callable requestBufferTask = new Callable() { + final Callable requestBufferTask = new Callable() { --- End diff -- please keep `Callable` (or replace by a lambda) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264722#comment-16264722 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152860672 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** -* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is -* recycled to available buffers directly and it triggers notify of announced credit. +* Tests to verify that the input channel requests floating buffers from buffer pool +* in order to maintain backlog + initialCredit buffers available once receiving the +* sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + + } finally { + // Release all the buffer resources +
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264721#comment-16264721 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152862469 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -475,16 +515,10 @@ public Void call() throws Exception { }; // Submit tasks and wait to finish - final List> results = Lists.newArrayListWithCapacity(2); - results.add(executor.submit(requestBufferTask)); - results.add(executor.submit(releaseTask)); - for (Future result : results) { - result.get(); - } + submitTasksAndWaitResults(executor, new Callable[]{requestBufferTask, releaseTask}); assertEquals("There should be no buffers available in the channel.", 0, inputChannel.getNumberOfAvailableBuffers()); --- End diff -- please add: ``` assertEquals("There should be 130 buffers available in local pool.", 130, bufferPool.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments()); ``` (buffers could be in either buffer pool depending on whether they were requested at least once from the `LocalBufferPool` or not) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264719#comment-16264719 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152861029 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -440,21 +476,25 @@ public void testFairDistributionFloatingBuffers() throws Exception { @Test public void testConcurrentOnSenderBacklogAndRelease() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(256, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 128; + final ExecutorService executor = Executors.newFixedThreadPool(2); + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128); + final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); - inputGate.assignExclusiveSegments(networkBufferPool, 2); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - final Callable requestBufferTask = new Callable() { + final Callable requestBufferTask = new Callable() { --- End diff -- please keep `Callable` (or replace by a lambda) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264720#comment-16264720 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152861040 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -465,7 +505,7 @@ public Void call() throws Exception { } }; - final Callable releaseTask = new Callable() { + final Callable releaseTask = new Callable() { --- End diff -- please keep `Callable` (or replace by a lambda) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152860672 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** -* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is -* recycled to available buffers directly and it triggers notify of announced credit. +* Tests to verify that the input channel requests floating buffers from buffer pool +* in order to maintain backlog + initialCredit buffers available once receiving the +* sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + + } finally { + // Release all the buffer resources + inputChannel.releaseAllResources(); + + networkBufferPool.destroyAllBufferPools(); + networkBufferPool.destroy(); + } } /** -* Tests {@link RemoteInputChannel#recy
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152861040 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -465,7 +505,7 @@ public Void call() throws Exception { } }; - final Callable releaseTask = new Callable() { + final Callable releaseTask = new Callable() { --- End diff -- please keep `Callable` (or replace by a lambda) ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152862469 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -475,16 +515,10 @@ public Void call() throws Exception { }; // Submit tasks and wait to finish - final List> results = Lists.newArrayListWithCapacity(2); - results.add(executor.submit(requestBufferTask)); - results.add(executor.submit(releaseTask)); - for (Future result : results) { - result.get(); - } + submitTasksAndWaitResults(executor, new Callable[]{requestBufferTask, releaseTask}); assertEquals("There should be no buffers available in the channel.", 0, inputChannel.getNumberOfAvailableBuffers()); --- End diff -- please add: ``` assertEquals("There should be 130 buffers available in local pool.", 130, bufferPool.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments()); ``` (buffers could be in either buffer pool depending on whether they were requested at least once from the `LocalBufferPool` or not) ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152860980 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -506,36 +540,22 @@ public Void call() throws Exception { @Test public void testConcurrentOnSenderBacklogAndRecycle() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(256, 32, MemoryType.HEAP); - final ExecutorService executor = Executors.newFixedThreadPool(2); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32, MemoryType.HEAP); + final int numExclusiveSegments = 120; + final int numFloatingBuffers = 128; + final int backlog = 128; + + final ExecutorService executor = Executors.newFixedThreadPool(3); + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 128; - final int numExclusiveSegments = 2; final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveSegments); - // Exhaust all the floating buffers - final List floatingBuffers = new ArrayList<>(numFloatingBuffers); - for (int i = 0; i < numFloatingBuffers; i++) { - Buffer buffer = bufferPool.requestBuffer(); - assertNotNull(buffer); - floatingBuffers.add(buffer); - } - - // Exhaust all the exclusive buffers - final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); - for (int i = 0; i < numExclusiveSegments; i++) { - Buffer buffer = inputChannel.requestBuffer(); - assertNotNull(buffer); - exclusiveBuffers.add(buffer); - } - - final int backlog = 128; - final Callable requestBufferTask = new Callable() { + final Callable requestBufferTask = new Callable() { --- End diff -- please keep `Callable` (or replace by a lambda) ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152861029 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -440,21 +476,25 @@ public void testFairDistributionFloatingBuffers() throws Exception { @Test public void testConcurrentOnSenderBacklogAndRelease() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(256, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 128; + final ExecutorService executor = Executors.newFixedThreadPool(2); + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128); + final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); - inputGate.assignExclusiveSegments(networkBufferPool, 2); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - final Callable requestBufferTask = new Callable() { + final Callable requestBufferTask = new Callable() { --- End diff -- please keep `Callable` (or replace by a lambda) ---
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264709#comment-16264709 ] ASF GitHub Bot commented on FLINK-7694: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4757 Hi @GJL , you can take over this PR > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHandler to ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4757 Hi @GJL , you can take over this PR ---
[jira] [Commented] (FLINK-6505) Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
[ https://issues.apache.org/jira/browse/FLINK-6505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264706#comment-16264706 ] ASF GitHub Bot commented on FLINK-6505: --- Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4798 > Proactively cleanup local FS for RocksDBKeyedStateBackend on startup > > > Key: FLINK-6505 > URL: https://issues.apache.org/jira/browse/FLINK-6505 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Bowen Li > Fix For: 1.4.0, 1.5.0 > > > In {{RocksDBKeyedStateBackend}}, the {{instanceBasePath}} is cleared on > {{dispose()}}. I think it might make sense to also clear this directory when > the backend is created, in case something crashed and the backend never > reached {{dispose()}}. At least for previous runs of the same job, we can > know what to delete on restart. > In general, it is very important for this backend to clean up the local FS, > because the local quota might be very limited compared to the DFS. And a node > that runs out of local disk space can bring down the whole job, with no way > to recover (it might always get rescheduled to that node). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4798: [FLINK-6505] Proactively cleanup local FS for Rock...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4798 ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264697#comment-16264697 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152854762 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as lis
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264696#comment-16264696 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152860104 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** -* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is -* recycled to available buffers directly and it triggers notify of announced credit. +* Tests to verify that the input channel requests floating buffers from buffer pool +* in order to maintain backlog + initialCredit buffers available once receiving the +* sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + + } finally { + // Release all the buffer resources +
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264701#comment-16264701 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152855823 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers --- End diff -- add note that one exclusive buffer is taken (and therefore 11 requests and not 10) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInput
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859769 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { --- End diff -- please add a Javadoc ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264692#comment-16264692 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152836741 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { + exclusiveBuffers.add(buffer); + + if (getAvailableBufferSize() > numRequiredBuffers) { + Buffer floatingBuffer = floatingBuffers.poll(); + floatingBuffer.recycle(); + return true; + } else { + return false; + } } - int getFloatingBufferSize() { - return floatingBuffers.size(); + /** +* Take the floating buffer first if possible. +*/ + @Nullable + Buffer takeBuffer() { + if (floatingBuffers.size() > 0) { + return floatingBuffers.poll(); + } else { + return exclusiveBuffers.poll(); + } + } + + /** +* The floating buffer is recycled to local buffer pool directly, and the +* exclusive buffer will be gathered to return to global buffer pool later. +*/ + void releaseAll(List exclusiveSegments) { --- End diff -- please document the `exclusiveSegments` parameter to make it absolutely clear for the user that we will add memory segments of exclusive buffers > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152858602 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).reque
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152836741 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { + exclusiveBuffers.add(buffer); + + if (getAvailableBufferSize() > numRequiredBuffers) { + Buffer floatingBuffer = floatingBuffers.poll(); + floatingBuffer.recycle(); + return true; + } else { + return false; + } } - int getFloatingBufferSize() { - return floatingBuffers.size(); + /** +* Take the floating buffer first if possible. +*/ + @Nullable + Buffer takeBuffer() { + if (floatingBuffers.size() > 0) { + return floatingBuffers.poll(); + } else { + return exclusiveBuffers.poll(); + } + } + + /** +* The floating buffer is recycled to local buffer pool directly, and the +* exclusive buffer will be gathered to return to global buffer pool later. +*/ + void releaseAll(List exclusiveSegments) { --- End diff -- please document the `exclusiveSegments` parameter to make it absolutely clear for the user that we will add memory segments of exclusive buffers ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264699#comment-16264699 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859784 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List floatingBuffers = new ArrayList<>(numFloatingBuffers); + // Exhaust all the floating buffers + for (int i = 0; i < numFloatingBuffers; i++) { + Buffer buffer = bufferPool.requestBuffer(); + assertNotNull(buffer); + floatingBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : floatingBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private void submitTasksAndWaitResults(ExecutorService executor, Callable[] tasks) throws Exception { --- End diff -- please add a Javadoc > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264695#comment-16264695 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152853208 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- actually, this is now offering two functionalities: ``` addExclusiveBuffer(buffer); maintainTargetSize(numRequiredBuffers); ``` I'd suggest to either use the two separately or integrate the target size maintaining into `addExclusiveBuffers`. In any case, you should make `AvailableBufferQueue` a `static` inner class which could then also be tested separately. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264691#comment-16264691 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859775 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List floatingBuffers = new ArrayList<>(numFloatingBuffers); --- End diff -- please add a Javadoc > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152856498 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).reque
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152855823 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers --- End diff -- add note that one exclusive buffer is taken (and therefore 11 requests and not 10) ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859775 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List floatingBuffers = new ArrayList<>(numFloatingBuffers); --- End diff -- please add a Javadoc ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264690#comment-16264690 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152852133 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { + exclusiveBuffers.add(buffer); + + if (getAvailableBufferSize() > numRequiredBuffers) { + Buffer floatingBuffer = floatingBuffers.poll(); + floatingBuffer.recycle(); + return true; + } else { + return false; + } } - int getFloatingBufferSize() { - return floatingBuffers.size(); + /** +* Take the floating buffer first if possible. +*/ + @Nullable + Buffer takeBuffer() { --- End diff -- please explain when the result may be `null` > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152853208 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- actually, this is now offering two functionalities: ``` addExclusiveBuffer(buffer); maintainTargetSize(numRequiredBuffers); ``` I'd suggest to either use the two separately or integrate the target size maintaining into `addExclusiveBuffers`. In any case, you should make `AvailableBufferQueue` a `static` inner class which could then also be tested separately. ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264698#comment-16264698 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859769 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { --- End diff -- please add a Javadoc > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859069 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** -* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is -* recycled to available buffers directly and it triggers notify of announced credit. +* Tests to verify that the input channel requests floating buffers from buffer pool +* in order to maintain backlog + initialCredit buffers available once receiving the +* sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + --- End diff -- Thanks, the extensions you implemented for `testRequestFloatingBufferOnSenderBacklog()` were exactly what I missed previously and was not covered by `testFairDistributionFloatingBuffers()`. Based on your additions, I came up with some more (corner) cases that should be covered (see above) ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264689#comment-16264689 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152856498 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as lis
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152857580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).reque
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264694#comment-16264694 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152858602 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as lis
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264700#comment-16264700 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152857580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as lis
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152852133 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { + exclusiveBuffers.add(buffer); + + if (getAvailableBufferSize() > numRequiredBuffers) { + Buffer floatingBuffer = floatingBuffers.poll(); + floatingBuffer.recycle(); + return true; + } else { + return false; + } } - int getFloatingBufferSize() { - return floatingBuffers.size(); + /** +* Take the floating buffer first if possible. +*/ + @Nullable + Buffer takeBuffer() { --- End diff -- please explain when the result may be `null` ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152854762 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).reque
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264693#comment-16264693 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859069 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** -* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is -* recycled to available buffers directly and it triggers notify of announced credit. +* Tests to verify that the input channel requests floating buffers from buffer pool +* in order to maintain backlog + initialCredit buffers available once receiving the +* sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + --- End diff -- Thanks, the extensions you implemented for `testRequestFloatingBufferOnSenderBack
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859784 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List floatingBuffers = new ArrayList<>(numFloatingBuffers); + // Exhaust all the floating buffers + for (int i = 0; i < numFloatingBuffers; i++) { + Buffer buffer = bufferPool.requestBuffer(); + assertNotNull(buffer); + floatingBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : floatingBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private void submitTasksAndWaitResults(ExecutorService executor, Callable[] tasks) throws Exception { --- End diff -- please add a Javadoc ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152860104 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** -* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is -* recycled to available buffers directly and it triggers notify of announced credit. +* Tests to verify that the input channel requests floating buffers from buffer pool +* in order to maintain backlog + initialCredit buffers available once receiving the +* sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + + } finally { + // Release all the buffer resources + inputChannel.releaseAllResources(); + + networkBufferPool.destroyAllBufferPools(); + networkBufferPool.destroy(); + } } /** -* Tests {@link RemoteInputChannel#recy
[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264667#comment-16264667 ] ASF GitHub Bot commented on FLINK-8125: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5059 Addressed the comment. > Support limiting the number of open FileSystem connections > -- > > Key: FLINK-8125 > URL: https://issues.apache.org/jira/browse/FLINK-8125 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.5.0, 1.4.1 > > > We need a way to limit the number of streams that Flink FileSystems > concurrently open. > For example, for very small HDFS clusters with few RPC handlers, a large > Flink job trying to build up many connections during a checkpoint causes > failures due to rejected connections. > I propose to add a file system that can wrap another existing file system The > file system may track the progress of streams and close streams that have > been inactive for too long, to avoid locked streams of taking up the complete > pool. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5059: [FLINK-8125] [core] Introduce limiting of file system con...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5059 Addressed the comment. ---
[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264664#comment-16264664 ] ASF GitHub Bot commented on FLINK-8125: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152856201 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private fina
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152856201 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ + @G
[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264659#comment-16264659 ] ASF GitHub Bot commented on FLINK-8125: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152854733 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private fina
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152854733 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ + @G
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152854163 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ + @G
[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264655#comment-16264655 ] ASF GitHub Bot commented on FLINK-8125: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152854163 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private fina
[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264652#comment-16264652 ] ASF GitHub Bot commented on FLINK-8125: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152853859 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private fina
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152853859 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ + @G
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264638#comment-16264638 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 alright - using fixup commits now for you ;) FYI: all `[FLINK-7499]` belong together and can be squashed > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 alright - using fixup commits now for you ;) FYI: all `[FLINK-7499]` belong together and can be squashed ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264633#comment-16264633 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152850290 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- In theory, you are right - in practice though (and that's what I tried first), `AsynchronousBufferFileWriter` cannot be extended from anywhere outside its package due to `WriteRequest` being package-private. Since the writer implementations I needed are not too generic, I did not want to promote them to this package (in the tests folder, of course) nor did I want to make `WriteRequest` public...hence Mockito. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152850290 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- In theory, you are right - in practice though (and that's what I tried first), `AsynchronousBufferFileWriter` cannot be extended from anywhere outside its package due to `WriteRequest` being package-private. Since the writer implementations I needed are not too generic, I did not want to promote them to this package (in the tests folder, of course) nor did I want to make `WriteRequest` public...hence Mockito. ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264629#comment-16264629 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152849820 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition. +*/ + @Test + public void testAddOnReleasedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + assertEquals(0, partition.releaseMemory()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the +* write request fails with an exception. +*/ + @Test + public void testAddOnSpilledPartitionW
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152849820 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition. +*/ + @Test + public void testAddOnReleasedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + assertEquals(0, partition.releaseMemory()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the +* write request fails with an exception. +*/ + @Test + public void testAddOnSpilledPartitionWithSlowWriter() throws Exception { + IOManager ioManager = new IOManagerAsyncWithStallingBufferFileWriter(); + SpillableSubpartition partition = createSubpartition(ioManager); + assertEquals(0, partition.releaseMe
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264620#comment-16264620 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152849113 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- What do you mean? What failure / normal path? `partition.add()` should always succeed in this case, i.e. it does not throw. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152849113 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- What do you mean? What failure / normal path? `partition.add()` should always succeed in this case, i.e. it does not throw. ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264605#comment-16264605 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152847273 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** --- End diff -- you're right - some tests actually share most of their code. I'll extract a common test method to reduce some code. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152847273 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** --- End diff -- you're right - some tests actually share most of their code. I'll extract a common test method to reduce some code. ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264593#comment-16264593 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152845922 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java --- @@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) } } + @Test + public void testAddOnPipelinedPartition() throws Exception { + testAddOnPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnBlockingPartition() throws Exception { + testAddOnPartition(ResultPartitionType.BLOCKING); + } + + /** +* Tests {@link ResultPartition#add} on a working partition. +* +* @param pipelined the result partition type to set up +*/ + protected void testAddOnPartition(final ResultPartitionType pipelined) + throws Exception { + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); --- End diff -- do you think it's better to have a real implementation of that interface, spy on it, and then verify the expected method calls? - this actually seems like some more overhead with little/no gain. I'd prefer to leave it as is for now. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152845922 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java --- @@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) } } + @Test + public void testAddOnPipelinedPartition() throws Exception { + testAddOnPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnBlockingPartition() throws Exception { + testAddOnPartition(ResultPartitionType.BLOCKING); + } + + /** +* Tests {@link ResultPartition#add} on a working partition. +* +* @param pipelined the result partition type to set up +*/ + protected void testAddOnPartition(final ResultPartitionType pipelined) + throws Exception { + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); --- End diff -- do you think it's better to have a real implementation of that interface, spy on it, and then verify the expected method calls? - this actually seems like some more overhead with little/no gain. I'd prefer to leave it as is for now. ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264579#comment-16264579 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152842686 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException { return false; } - // The number of buffers are needed later when creating - // the read views. If you ever remove this line here, - // make sure to still count the number of buffers. - updateStatistics(buffer); - if (spillWriter == null) { buffers.add(buffer); + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. --- End diff -- yes, all my new tests verify these numbers via ``` assertEquals(..., partition.getTotalNumberOfBuffers()); assertEquals(..., partition.getTotalNumberOfBytes()); ``` but I'll add some more of those checks to some existing tests > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152842686 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException { return false; } - // The number of buffers are needed later when creating - // the read views. If you ever remove this line here, - // make sure to still count the number of buffers. - updateStatistics(buffer); - if (spillWriter == null) { buffers.add(buffer); + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. --- End diff -- yes, all my new tests verify these numbers via ``` assertEquals(..., partition.getTotalNumberOfBuffers()); assertEquals(..., partition.getTotalNumberOfBytes()); ``` but I'll add some more of those checks to some existing tests ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264573#comment-16264573 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152841030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() { * first buffer has been added. */ public void add(Buffer buffer, int subpartitionIndex) throws IOException { - boolean success = false; + checkNotNull(buffer); try { checkInProduceState(); + } catch (Throwable t) { + buffer.recycle(); --- End diff -- Actually, a sanity check for double-recycle comes with #4613 for which I also needed this PR. It does, however, work differently and only checks that the reference counter does not go below 0 - I guess, this way we do not put too much pressure on the garbage collector compared to creating new Buffer instances for each `retain()` > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152841030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() { * first buffer has been added. */ public void add(Buffer buffer, int subpartitionIndex) throws IOException { - boolean success = false; + checkNotNull(buffer); try { checkInProduceState(); + } catch (Throwable t) { + buffer.recycle(); --- End diff -- Actually, a sanity check for double-recycle comes with #4613 for which I also needed this PR. It does, however, work differently and only checks that the reference counter does not go below 0 - I guess, this way we do not put too much pressure on the garbage collector compared to creating new Buffer instances for each `retain()` ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264559#comment-16264559 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152829734 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- again, why mockito? It's terrible for debugging and completely fails with refactoring (like added new overloaded method `BlockChannelWriterWithCallback::writeBlock` or changing signatures). Especially that at the same time overloading returning here anonymous class overloading `AsynchronousBufferFileWriter`, that replaces `writeBlock` with empty method is just as easy :( > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152829734 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- again, why mockito? It's terrible for debugging and completely fails with refactoring (like added new overloaded method `BlockChannelWriterWithCallback::writeBlock` or changing signatures). Especially that at the same time overloading returning here anonymous class overloading `AsynchronousBufferFileWriter`, that replaces `writeBlock` with empty method is just as easy :( ---
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152822778 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException { return false; } - // The number of buffers are needed later when creating - // the read views. If you ever remove this line here, - // make sure to still count the number of buffers. - updateStatistics(buffer); - if (spillWriter == null) { buffers.add(buffer); + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. --- End diff -- Is it tested somewhere? ---
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826013 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java --- @@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) } } + @Test + public void testAddOnPipelinedPartition() throws Exception { + testAddOnPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnBlockingPartition() throws Exception { + testAddOnPartition(ResultPartitionType.BLOCKING); + } + + /** +* Tests {@link ResultPartition#add} on a working partition. +* +* @param pipelined the result partition type to set up +*/ + protected void testAddOnPartition(final ResultPartitionType pipelined) + throws Exception { + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); --- End diff -- https://imgflip.com/i/1zvsnt :( ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264557#comment-16264557 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826521 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- ? Why does this test covers for both failure and normal paths? What if one of them never happen? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826521 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- ? Why does this test covers for both failure and normal paths? What if one of them never happen? ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264553#comment-16264553 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826839 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- ditto (and below) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264552#comment-16264552 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826013 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java --- @@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) } } + @Test + public void testAddOnPipelinedPartition() throws Exception { + testAddOnPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnBlockingPartition() throws Exception { + testAddOnPartition(ResultPartitionType.BLOCKING); + } + + /** +* Tests {@link ResultPartition#add} on a working partition. +* +* @param pipelined the result partition type to set up +*/ + protected void testAddOnPartition(final ResultPartitionType pipelined) + throws Exception { + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); --- End diff -- https://imgflip.com/i/1zvsnt :( > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264558#comment-16264558 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152837725 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** --- End diff -- could some of those tests be squashed into fewer methods? Or you think that wouldn't be a good idea? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152825430 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() { * first buffer has been added. */ public void add(Buffer buffer, int subpartitionIndex) throws IOException { - boolean success = false; + checkNotNull(buffer); try { checkInProduceState(); + } catch (Throwable t) { + buffer.recycle(); --- End diff -- I wonder if we should have some sanity illegal state detection for double recycling the buffers. For example each buffer could only be recycled once (protected by a private field in the Buffer `boolean wasRecycled`). Whenever you call `retain()`, you would get a new instance of the `Buffer`, pointing to the same memory, but with new flag (so that both original and retained buffers could be recycled independently, but each one of them only once). ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264555#comment-16264555 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152827018 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition. +*/ + @Test + public void testAddOnReleasedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + assertEquals(0, partition.releaseMemory()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the +* write request fails with an exception. +*/ + @Test + public void testAddOnSpilledPartit
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152827018 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition. +*/ + @Test + public void testAddOnReleasedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + assertEquals(0, partition.releaseMemory()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the +* write request fails with an exception. +*/ + @Test + public void testAddOnSpilledPartitionWithSlowWriter() throws Exception { + IOManager ioManager = new IOManagerAsyncWithStallingBufferFileWriter(); + SpillableSubpartition partition = createSubpartition(ioManager); + assertEquals(0, partition.relea
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826839 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- ditto (and below) ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264556#comment-16264556 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152822778 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException { return false; } - // The number of buffers are needed later when creating - // the read views. If you ever remove this line here, - // make sure to still count the number of buffers. - updateStatistics(buffer); - if (spillWriter == null) { buffers.add(buffer); + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. --- End diff -- Is it tested somewhere? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264554#comment-16264554 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152825430 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() { * first buffer has been added. */ public void add(Buffer buffer, int subpartitionIndex) throws IOException { - boolean success = false; + checkNotNull(buffer); try { checkInProduceState(); + } catch (Throwable t) { + buffer.recycle(); --- End diff -- I wonder if we should have some sanity illegal state detection for double recycling the buffers. For example each buffer could only be recycled once (protected by a private field in the Buffer `boolean wasRecycled`). Whenever you call `retain()`, you would get a new instance of the `Buffer`, pointing to the same memory, but with new flag (so that both original and retained buffers could be recycled independently, but each one of them only once). > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152837725 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** --- End diff -- could some of those tests be squashed into fewer methods? Or you think that wouldn't be a good idea? ---