[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671355#comment-16671355 ] ASF GitHub Bot commented on FLINK-8581: --- pnowojski edited a comment on issue #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425 @zhijiangW unfortunately no, I have to rethink this whole thing and it may need some much bigger refactor to avoid adding more locks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671351#comment-16671351 ] ASF GitHub Bot commented on FLINK-8581: --- pnowojski commented on issue #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425 No, I have to rethink this whole thing and it may need some much bigger refactor to avoid adding more locks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651390#comment-16651390 ] ASF GitHub Bot commented on FLINK-8581: --- pnowojski commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r225464137 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } + } + + @Override + public void setFlushTimeout(long flushTimeout) { + checkState(!this.flushTimeout.isPresent(), "Flush timeout can not be set twice"); + for (ResultSubpartition subpartition: remoteSubpartitionsMissingPeriodicFlushes) { + checkState(subpartition.isLocal().isPresent()); + checkState(!subpartition.isLocal().get()); + subpartition.registerPeriodicFlush(flushTimeout); + } + remoteSubpartitionsMissingPeriodicFlushes.clear(); Review comment: In that case I would have to re-evaluate this. Adding extra synchronisation would further complicate the code and could cause regressions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649831#comment-16649831 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r225059097 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } + } + + @Override + public void setFlushTimeout(long flushTimeout) { + checkState(!this.flushTimeout.isPresent(), "Flush timeout can not be set twice"); + for (ResultSubpartition subpartition: remoteSubpartitionsMissingPeriodicFlushes) { + checkState(subpartition.isLocal().isPresent()); + checkState(!subpartition.isLocal().get()); + subpartition.registerPeriodicFlush(flushTimeout); + } + remoteSubpartitionsMissingPeriodicFlushes.clear(); Review comment: Yes, we may need add the synchronizing in above three methods for protecting `localSubpartitions`, `remoteSubpartitionsMissingPeriodicFlushes` and `flushTimeout` which are operated by task thread, netty thread and flusher thread. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647850#comment-16647850 ] ASF GitHub Bot commented on FLINK-8581: --- pnowojski commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r224765094 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } + } + + @Override + public void setFlushTimeout(long flushTimeout) { + checkState(!this.flushTimeout.isPresent(), "Flush timeout can not be set twice"); + for (ResultSubpartition subpartition: remoteSubpartitionsMissingPeriodicFlushes) { + checkState(subpartition.isLocal().isPresent()); + checkState(!subpartition.isLocal().get()); + subpartition.registerPeriodicFlush(flushTimeout); + } + remoteSubpartitionsMissingPeriodicFlushes.clear(); Review comment: You might be right. But fix for that requires synchronizing: - `setFlushTimeout` - `createSubpartitionView` - `flushAllLocal` right :/ ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642875#comment-16642875 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223575788 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -115,8 +115,12 @@ public boolean isRegisteredAsAvailable() { @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)! - return hasBuffersAvailable() && - (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); + return hasBuffersAvailable() && !isBlocked(); + } + + @Override + public boolean isBlocked() { Review comment: I think the `isBlocked` naming can not indicate the specific semantics. How about changing to `isCreditsAvailable()` directly? To do so, the `isAvailable()` breaks into `buffersAvailable()` and `creditsAvailable()`, and the following private method `isAvailable(BufferAndBacklog bufferAndBacklog)` may also reuse `isCreditsAvailable()` . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642866#comment-16642866 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223574052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -119,6 +141,22 @@ private void enqueueAvailableReader(final NetworkSequenceViewReader reader) thro } } + private void flushReaders(long flushTimeout) throws Exception { + List readersToFlush = periodicFlushes.getReaders(flushTimeout); + + boolean wasEmpty = availableReaders.isEmpty(); + + for (NetworkSequenceViewReader reader : readersToFlush) { + if (!reader.isRegisteredAsAvailable() && !reader.isBlocked()) { Review comment: I think the flush operation indicates if this reader has both unfinished `BufferConsumer` and credits, we still want to transport this buffer to reduce latency. So the conditions of available reader should cover both available buffers and available credits. But the current conditions only confirm the reader has available credits to insert into queue. When we poll this reader from the queue and get next buffer to find null, it seems not make sense to register available reader here. So is it reasonable to adjust the conditions here to confirm this reader has both credits and unfinished buffers? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642790#comment-16642790 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223557953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); + private final Map> scheduledEvents = new HashMap<>(); + + public void register(ChannelHandlerContext ctx, long flushTimeout, NetworkSequenceViewReader reader) { Review comment: remove `public` for all the methods in this inner class? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642779#comment-16642779 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223556150 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { Review comment: We should clear the related maps or cancel the future in `releaseAllResources` caused by `exceptionCaught` or `channelInactive`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642757#comment-16642757 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223553034 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); + private final Map> scheduledEvents = new HashMap<>(); + + public void register(ChannelHandlerContext ctx, long flushTimeout, NetworkSequenceViewReader reader) { + checkState(!flushTimeouts.containsKey(reader)); + checkState(flushTimeout > 0); Review comment: If we check the `flushTimeout` at the beginning in `ResultPartition` stack as mentioned above, we need not care about it in the following processes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642749#comment-16642749 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223552250 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -97,6 +107,18 @@ public void run() { }); } + void registerPeriodicFlush(NetworkSequenceViewReader reader, long flushTimeout) { + if (flushTimeout == 0) { Review comment: This condition check can be done earlier in `ResultPartition` or `ResultSubpartition` stack? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642745#comment-16642745 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223550777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { Review comment: 1. The behavior of this method may be simple like `notifyDataAvailable`? So we can reduce `synchronized` part. ``` if (readView != null) { readView. registerPeriodicFlush(flushTimeout); } ``` 2. This implementation is same in `SpillableSubpartition#registerPeriodicFlush`, how about implementing this method in the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change is also defining protected `readView` in `ResultSubpartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642742#comment-16642742 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223551011 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -106,11 +109,32 @@ protected Throwable getFailureCause() { abstract public void flush(); + /** +* Remote subpartitions support automatic periodic flush. This is the method to register it. +* Can only by used after {@link #isLocal()} state is known. Review comment: by used -> be used? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642741#comment-16642741 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223550777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { Review comment: 1. The behavior of this method may be simple like `notifyDataAvailable`? So we can reduce `synchronized` part. ``` if (readView != null) { readView. registerPeriodicFlush(flushTimeout); } ``` 2. This implementation is same in `SpillableSubpartition#registerPeriodicFlush`, maybe we can put this method in the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change is also defining protected `readView` in `ResultSubpartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642714#comment-16642714 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223546433 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. Review comment: it's ->its This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641774#comment-16641774 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r221894938 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -225,6 +274,7 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO } next = reader.getNextBuffer(); + Review comment: revert? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641773#comment-16641773 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r222015562 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java ## @@ -24,8 +24,14 @@ */ public interface BufferAvailabilityListener { + default boolean isLocal() { + return false; + } + /** * Called whenever there might be new data available. */ void notifyDataAvailable(); + + void registerPeriodicFlush(long flushTimeout); Review comment: somehow the interface's name does not really match the extensions anymore...maybe you have a better idea? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641765#comment-16641765 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r222012019 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); Review comment: Is it worth having `flushTimeouts` in memory only to optimise cancellation (and one debug check in `register()`)? Currently, this is used in two places: - in `register()` to check whether we have already added the reader - we could instead go through the list in `periodicFlushes` if we really wanted to - in `cancel()` to retrieve the right flush timeout so that we can easily remove the reader from `periodicFlushes` - we could iterate over it instead -> both alternative are a bit slower, but only happen during registration (if even) and cancellation and are therefore not performance critical This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641783#comment-16641783 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223342148 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -106,11 +109,32 @@ protected Throwable getFailureCause() { abstract public void flush(); + /** +* Remote subpartitions support automatic periodic flush. This is the method to register it. +* Can only by used after {@link #isLocal()} state is known. +*/ + public abstract void registerPeriodicFlush(long flushTimeout); + + /** +* @return empty if {@link #createReadView(BufferAvailabilityListener)} has not been yet called. +* Afterwards returns {@code Optional.of(true)} or {@code Optional.of(false)} +*/ + public Optional isLocal() { + return isLocal; + } + abstract public void finish() throws IOException; abstract public void release() throws IOException; - abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException; + public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { + isLocal = Optional.of(availabilityListener.isLocal()); + return createReadViewInternal(availabilityListener); + } + + Review comment: nit: remove additional empty line (checkstyle) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641772#comment-16641772 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223287311 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -47,6 +48,8 @@ @GuardedBy("buffers") private int buffersInBacklog; + private Optional isLocal = Optional.empty(); Review comment: how about using `TernaryBoolean`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641768#comment-16641768 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r221851168 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -131,10 +169,17 @@ private void enqueueAvailableReader(final NetworkSequenceViewReader reader) thro return availableReaders; } - public void notifyReaderCreated(final NetworkSequenceViewReader reader) { + public void notifyReaderCreated(final NetworkSequenceViewReader reader) throws Exception { Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641778#comment-16641778 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223341962 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -106,11 +109,32 @@ protected Throwable getFailureCause() { abstract public void flush(); + /** +* Remote subpartitions support automatic periodic flush. This is the method to register it. +* Can only by used after {@link #isLocal()} state is known. +*/ + public abstract void registerPeriodicFlush(long flushTimeout); + + /** +* @return empty if {@link #createReadView(BufferAvailabilityListener)} has not been yet called. +* Afterwards returns {@code Optional.of(true)} or {@code Optional.of(false)} +*/ + public Optional isLocal() { + return isLocal; + } Review comment: `isLocal` may not be required after the proposed change above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641776#comment-16641776 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223342607 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -106,11 +109,32 @@ protected Throwable getFailureCause() { abstract public void flush(); + /** +* Remote subpartitions support automatic periodic flush. This is the method to register it. +* Can only by used after {@link #isLocal()} state is known. +*/ + public abstract void registerPeriodicFlush(long flushTimeout); + + /** +* @return empty if {@link #createReadView(BufferAvailabilityListener)} has not been yet called. +* Afterwards returns {@code Optional.of(true)} or {@code Optional.of(false)} +*/ + public Optional isLocal() { + return isLocal; + } + abstract public void finish() throws IOException; abstract public void release() throws IOException; - abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException; + public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { Review comment: `public final`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641766#comment-16641766 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r221851878 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -97,6 +107,18 @@ public void run() { }); } + void registerPeriodicFlush(NetworkSequenceViewReader reader, long flushTimeout) { + if (flushTimeout == 0) { + return; + } + ctx.executor().execute(new Runnable() { + @Override + public void run() { + ctx.pipeline().fireUserEventTriggered(new RegisterPeriodicFlushEvent(reader, flushTimeout)); + } + }); Review comment: use a lambda function instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641782#comment-16641782 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223292943 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } + } + + @Override + public void setFlushTimeout(long flushTimeout) { + checkState(!this.flushTimeout.isPresent(), "Flush timeout can not be set twice"); + for (ResultSubpartition subpartition: remoteSubpartitionsMissingPeriodicFlushes) { + checkState(subpartition.isLocal().isPresent()); + checkState(!subpartition.isLocal().get()); + subpartition.registerPeriodicFlush(flushTimeout); + } + remoteSubpartitionsMissingPeriodicFlushes.clear(); Review comment: either also `trimToSize()` here, or using an `Optional` as described above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641777#comment-16641777 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223290601 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. +*/ + private final ArrayList localSubpartitions = new ArrayList<>(); Review comment: It would be nice, if you could also `trimToSize()` this once you know. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641770#comment-16641770 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223289891 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. +*/ + private final ArrayList localSubpartitions = new ArrayList<>(); + + /** +* Subset of {@code subpartitions} that are definitely remote, however once we determined that, +* we haven't yet known about {@link #flushTimeout}. This has to be handled during Review comment: typo: `we may not yet know about the...` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641780#comment-16641780 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223292589 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } + } + + @Override + public void setFlushTimeout(long flushTimeout) { + checkState(!this.flushTimeout.isPresent(), "Flush timeout can not be set twice"); + for (ResultSubpartition subpartition: remoteSubpartitionsMissingPeriodicFlushes) { + checkState(subpartition.isLocal().isPresent()); + checkState(!subpartition.isLocal().get()); Review comment: While this may not hurt, but are these two `checkState` really necessary? They kind of undermine `remoteSubpartitionsMissingPeriodicFlushes`'s authority ;) - if any, shouldn't this be checked upon insertion into `remoteSubpartitionsMissingPeriodicFlushes` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641769#comment-16641769 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r221655152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java ## @@ -52,6 +52,8 @@ void requestSubpartitionView( */ boolean isAvailable(); + boolean isBlocked(); Review comment: please add a comment what `isBlocked` should mean, especially since we already have an `isAvailable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641771#comment-16641771 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223340950 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. +*/ + private final ArrayList localSubpartitions = new ArrayList<>(); + + /** +* Subset of {@code subpartitions} that are definitely remote, however once we determined that, +* we haven't yet known about {@link #flushTimeout}. This has to be handled during +* {@link #setFlushTimeout(long)}. +*/ + private final ArrayList remoteSubpartitionsMissingPeriodicFlushes = new ArrayList<>(); + Review comment: actually, how about the following idea that should make things a bit simpler / improve the abstraction: - only differentiate between self-flushing channels (periodically, after registration) and channels that require manual flushing (instead of remove vs. local) - have `PipelinedSubpartition#registerPeriodicFlush()` return whether registering for "self-flush" worked or not - have `PipelinedSubpartition#registerPeriodicFlush()` dealing with the local vs. non-local nature of the channel - only have `ResultSubpartition[] subpartitions` and `ArrayList manuallyFlushedSubpartitions` members This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641781#comment-16641781 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223290657 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. +*/ + private final ArrayList localSubpartitions = new ArrayList<>(); + + /** +* Subset of {@code subpartitions} that are definitely remote, however once we determined that, +* we haven't yet known about {@link #flushTimeout}. This has to be handled during +* {@link #setFlushTimeout(long)}. +*/ + private final ArrayList remoteSubpartitionsMissingPeriodicFlushes = new ArrayList<>(); Review comment: Since this only exists temporarily, could we make it so that it doesn't occupy memory in the backing array anymore if not needed? Easiest way would be to use `trimToSize()` in `setFlushTimeout()` (see below) - or making this an `Optional`. I think, I'd prefer option 1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641775#comment-16641775 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223291788 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } Review comment: nit: how about `localSubpartitions.forEach(ResultSubpartition::flush);`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641767#comment-16641767 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r221894761 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -193,7 +242,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exc } } - allReaders.remove(toCancel); + periodicFlushes.cancel(checkNotNull(allReaders.remove(toCancel))); Review comment: this seems a bit unreadable - how about putting the `checkNotNull` into the `cancel` method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641779#comment-16641779 ] ASF GitHub Bot commented on FLINK-8581: --- NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r222017678 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { + synchronized (buffers) { + checkState(readView != null); Review comment: What's the gain of this `checkState`? It would throw a NPE in the next line anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641606#comment-16641606 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223309406 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -268,6 +291,25 @@ public void flushAll() { } } + @Override + public void flushAllLocal() { + for (ResultSubpartition localSubpartition : localSubpartitions) { + localSubpartition.flush(); + } + } + + @Override + public void setFlushTimeout(long flushTimeout) { + checkState(!this.flushTimeout.isPresent(), "Flush timeout can not be set twice"); + for (ResultSubpartition subpartition: remoteSubpartitionsMissingPeriodicFlushes) { + checkState(subpartition.isLocal().isPresent()); + checkState(!subpartition.isLocal().get()); + subpartition.registerPeriodicFlush(flushTimeout); + } + remoteSubpartitionsMissingPeriodicFlushes.clear(); Review comment: There may exit race condition between `setFlushTimeout()` and `createSubpartitionView()`? If the task thread invokes `setFlushTimeout` and clears the `remoteSubpartitionsMissingPeriodicFlushes`, at the same time, the netty thread invokes creating subpartition view and adding this subpartition to the `remoteSubpartitionsMissingPeriodicFlushes` before setting `flushTimeout`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614942#comment-16614942 ] ASF GitHub Bot commented on FLINK-8581: --- pnowojski opened a new pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698 first commit comes from https://github.com/apache/flink/pull/6697 This solves GC issues for cases with low latency (small flushTimeout) and many output channels and generally significantly improves low latency performance. OutputFlusher remains as for now to trigger flushes for local subpartitions. Registering periodic flushes in netty is unfortunately not the most beautiful thing in the world at the moment. It is complicated by two things: 1. we do know about flushTimeout only in flink-streaming-java and StreamTask, which is long after the point when we are actually creating subpartitions 2. we do not know before hand which subpartitions will be local and which will be remote ![Benchmark results](https://docs.google.com/spreadsheets/d/e/2PACX-1vQ4ImkIhEVyd0JuC0_KBzSiZk1ugqRYYJ29fftj8f7bvQHsyNTrS9PBS2g7YaI6q7kfyHXpWWsnb5lq/pubchart?oid=1194867281=image) Average throughput is significantly higher only for extreme cases, however the very important improvement here is solving (mitigating?) current GC issues, which is visible on the "min" graph. Without this change 1ms latency with 1000+ output channels suffers from frequent very long GC pauses. ## Verifying this change This change is cover by existing network stack tests, stress tests and almost all it cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369149#comment-16369149 ] ASF GitHub Bot commented on FLINK-8581: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5423 > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369142#comment-16369142 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r169082736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369143#comment-16369143 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5423 Thanks for those very good improvements, I will merge this. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369010#comment-16369010 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5423 I have rebased the PR and squashed the fixup commits. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369008#comment-16369008 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r169048737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- I would prefer to leave it as it is for now. I completely agree it's a bad design, however in this PR I'm only documenting this behaviour and changing the `null` to `Optional.empty()`, and because of the extensive scope of this change already as it is, I would prefer to fix this issue later on. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366841#comment-16366841 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168725762 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- What do you think about this? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366840#comment-16366840 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168725569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(),
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366839#comment-16366839 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168725339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- That sounds ok, maybe with some short pointer here to the high-level doc or else there is an increased change that somebody misses it. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366837#comment-16366837 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { + checkNotNull(bufferConsumer); synchronized (buffers) { if (isFinished || isReleased) { - return; + bufferConsumer.close(); + return false; } - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); + // Add the bufferConsumer and update the stats + buffers.add(bufferConsumer); + updateStatistics(bufferConsumer); + increaseBuffersInBacklog(bufferConsumer); - isFinished = true; + if (finish) { + isFinished = true; + notifyDataAvailable(); --- End diff -- > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366835#comment-16366835 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- I think doing this as future work is ok. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366834#comment-16366834 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724735 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- Good question, there could be a package private method that returns the buffer, and the public method uses this method but does not return the buffer. But questionable if that is really better, because we also would need to ensure that the the public goes through the private method etc. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366830#comment-16366830 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724296 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collectionfutures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set futuresSet = new HashSet<>(); --- End diff -- > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366828#comment-16366828 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724127 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { --- End diff -- Can introduce this change later after some more extensive tests. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365619#comment-16365619 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168490355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), -
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365570#comment-16365570 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168486202 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- It's kind of bad place for such comment - it can outdate without any control :/ What `UnionInputGate` know about `OutputFlusher` from the sender. This code should just assume that there is no guarantees about data notifications being accurate. It should be place in some high level network stack documentation. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365554#comment-16365554 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168480406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { + checkNotNull(bufferConsumer); synchronized (buffers) { if (isFinished || isReleased) { - return; + bufferConsumer.close(); + return false; } - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); + // Add the bufferConsumer and update the stats + buffers.add(bufferConsumer); + updateStatistics(bufferConsumer); + increaseBuffersInBacklog(bufferConsumer); - isFinished = true; + if (finish) { + isFinished = true; + notifyDataAvailable(); --- End diff -- RTFM :) https://github.com/apache/flink/pull/5423/commits/982edbce98db0bb7a5db0514d67aed0435a95d0f 1. it was done as separate commit, so is not related to rest of the changes 2. from commit message > notifyBuffersAvailable is a quick call that doesn't need to be executed outside of the lock advantages of commit by commit reviewing ;) double-confirm - no, this notification is very quick call, it only enqueue some work on a Netty's `Executor`. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365544#comment-16365544 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168479100 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- I see what you mean and I think that maybe this code could be deduplicated even further (moving `readView` field to the abstract class), but can we leave it as future work? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365534#comment-16365534 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168476375 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- Regarding returned value: without it I just didn't have a simple idea how to test for reference counting/recycling :/ > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365527#comment-16365527 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168475520 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collectionfutures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set futuresSet = new HashSet<>(); --- End diff -- Generally speaking you are right, but I think you missed removing finished futures from the `futuresSet`. Without this coping, removal could cause `wtf` moment for a user (`waitForAll` method removing something from the passed collection) and without removing, this code would be slow for larger number of futures. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365513#comment-16365513 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168473812 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { --- End diff -- As we discussed, I'm not entirely sure. This "minor change" can be a significant overhead in case of many channels and large records. I don't want to risk increasing the scope of potential problems with this PR :( > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365508#comment-16365508 ] ASF GitHub Bot commented on FLINK-8581: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168472169 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Not thread safe class for producing {@link Buffer}. + * + * It reads data written by {@link BufferBuilder}. + * Although it is not thread safe and can be used only by one single thread, this thread can be different then the + * thread using/writing to {@link BufferBuilder}. Pattern here is simple: one thread writes data to + * {@link BufferBuilder} and there can be a different thread reading from it using {@link BufferConsumer}. + */ +@NotThreadSafe +public class BufferConsumer implements Closeable { --- End diff -- Yes, I know. Can you propose some different naming scheme? `BufferWriter` and `BufferBuilder`? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362530#comment-16362530 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167896900 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link ResultPartitionWriter} that collects output on the List. + */ +@ThreadSafe +public abstract class AbstractCollectingResultPartitionWriter implements ResultPartitionWriter { + private final BufferProvider bufferProvider; + private final ArrayDeque bufferConsumers = new ArrayDeque<>(); + + public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) { + this.bufferProvider = checkNotNull(bufferProvider); + } + + @Override + public synchronized BufferProvider getBufferProvider() { + return bufferProvider; + } + + @Override + public synchronized ResultPartitionID getPartitionId() { + return new ResultPartitionID(); --- End diff -- What is the intended effect of having this `synchronized`, looks like it does nothing? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362529#comment-16362529 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167622468 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- From the description and also to better contrast against `pollNextBufferOrEvent ()`, it almost feels like this method should always return a `BufferOrEvent` and rather throw an exception if `isFinished()`. This seems to be how the empty optional is translated anyways, see AbstractRecordReader which reacts with `IllegalStateException`. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362532#comment-16362532 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167621192 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java --- @@ -60,12 +60,20 @@ void tagAsEvent(); /** -* Returns the underlying memory segment. +* Returns the underlying memory segment. This method is dangerous since it ignores read only protections and omits +* slices. Use it only along the {@link #getMemorySegmentOffset()}. * * @return the memory segment backing this buffer */ + @Deprecated MemorySegment getMemorySegment(); + /** +* @return the offset where this (potential slice) {@link Buffer}'s data start in the underlying memory segment. +*/ + @Deprecated --- End diff -- Same here. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362525#comment-16362525 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167859598 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- This method does not longer throw `InterruptedException`. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362518#comment-16362518 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167528361 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { } /** -* Writes the buffer to the {@link ResultPartitionWriter} and removes the -* buffer from the serializer state. +* Marks the current {@link BufferBuilder} as finished and clears the state for next one. * -* Needs to be synchronized on the serializer! +* @return true if some data were written */ - private void writeAndClearBuffer( - Buffer buffer, + private boolean tryFinishCurrentBufferBuilder( int targetChannel, RecordSerializer serializer) throws IOException { - try { - targetPartition.writeBuffer(buffer, targetChannel); - } - finally { - serializer.clearCurrentBuffer(); + if (!bufferBuilders[targetChannel].isPresent()) { + return false; } + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); + + numBytesOut.inc(bufferBuilder.getWrittenBytes()); + bufferBuilder.finish(); --- End diff -- You could combine this into `numBytesOut.inc(bufferBuilder.finish())` or maybe `finish()` should not need to have a return value? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362521#comment-16362521 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167604523 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { + checkNotNull(bufferConsumer); synchronized (buffers) { if (isFinished || isReleased) { - return; + bufferConsumer.close(); + return false; } - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); + // Add the bufferConsumer and update the stats + buffers.add(bufferConsumer); + updateStatistics(bufferConsumer); + increaseBuffersInBacklog(bufferConsumer); - isFinished = true; + if (finish) { + isFinished = true; + notifyDataAvailable(); --- End diff -- I noticed that this, introduces a subtle change: unlike before, the notification to the listeners now happens under the lock of `buffers`. Just want to double-check that this will not have negative side-effects for performance? Did this fix any correctness problems? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362515#comment-16362515 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167520277 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Not thread safe class for producing {@link Buffer}. + * + * It reads data written by {@link BufferBuilder}. + * Although it is not thread safe and can be used only by one single thread, this thread can be different then the + * thread using/writing to {@link BufferBuilder}. Pattern here is simple: one thread writes data to + * {@link BufferBuilder} and there can be a different thread reading from it using {@link BufferConsumer}. + */ +@NotThreadSafe +public class BufferConsumer implements Closeable { --- End diff -- Just a thought about names: this is called `BufferConsumer`, but it does not "consume" buffers. It is coordinating the production of read slices from a shared buffer. `BufferBuilder` makes more sense then this. Even worse, this class has a `build() : Buffer` method :-(. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362531#comment-16362531 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167527566 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java --- @@ -87,41 +86,12 @@ public boolean isFullBuffer() { SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; --- End diff -- One remark from reading the code, I found it a bit surprising that a method that looks like a setter will case the write to continue. Maybe this is better called something like `continueWritingWithNextBufferBuilder` or split the setter from a `continueWrite` method? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362517#comment-16362517 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167556221 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collectionfutures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set futuresSet = new HashSet<>(); --- End diff -- I think for all purposes, we do not need a set to deduplicate. If a future is contained multiple times and already finished, waiting for it again is basically a NOP. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362534#comment-16362534 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167621132 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java --- @@ -60,12 +60,20 @@ void tagAsEvent(); /** -* Returns the underlying memory segment. +* Returns the underlying memory segment. This method is dangerous since it ignores read only protections and omits +* slices. Use it only along the {@link #getMemorySegmentOffset()}. * * @return the memory segment backing this buffer */ + @Deprecated --- End diff -- You should name the proper replacement for this deprecated method in the comment, or say that it will be eventually removed without replacement. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362535#comment-16362535 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167883063 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(),
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362527#comment-16362527 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167879211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -220,6 +273,19 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO } } + private void registerAvailableReader(NetworkSequenceViewReader reader) { + availableReaders.add(reader); + reader.setRegisteredAsAvailable(true); + } + + private NetworkSequenceViewReader poolAvailableReader() { --- End diff -- This should probably be `pollAvailableReader()` > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362533#comment-16362533 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167860402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { } /** -* Writes the buffer to the {@link ResultPartitionWriter} and removes the -* buffer from the serializer state. +* Marks the current {@link BufferBuilder} as finished and clears the state for next one. * -* Needs to be synchronized on the serializer! +* @return true if some data were written */ - private void writeAndClearBuffer( - Buffer buffer, + private boolean tryFinishCurrentBufferBuilder( int targetChannel, RecordSerializer serializer) throws IOException { --- End diff -- This code no longer throws `IOException`. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362520#comment-16362520 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167596726 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- Maybe this part of the `release()`method should also go into the superclass, which usually manages `buffers`. The method could also have an optional hook for additional cleanups inside the `synchronized` block that subclasses can use if needed. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362528#comment-16362528 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167818220 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- Maybe we can add a comment explaining why this can happen now, i.e. mentioning about the output flusher. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362523#comment-16362523 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167588455 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- I think this method does not truly require a return value. The return value is only used in one test, and I found it confusing that it is first closed and then returned. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362536#comment-16362536 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167857194 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { --- End diff -- I think you can remove the `throws IOException`, and after that also on the public `add(...)`. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362526#comment-16362526 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167898515 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link ResultPartitionWriter} that collects output on the List. + */ +@ThreadSafe +public abstract class AbstractCollectingResultPartitionWriter implements ResultPartitionWriter { + private final BufferProvider bufferProvider; + private final ArrayDeque bufferConsumers = new ArrayDeque<>(); + + public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) { + this.bufferProvider = checkNotNull(bufferProvider); + } + + @Override + public synchronized BufferProvider getBufferProvider() { + return bufferProvider; --- End diff -- What does this `synchronize` help? The field is `final`, so I would assume this change is not required. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362522#comment-16362522 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167618791 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -138,6 +142,12 @@ /** Channels, which notified this input gate about available data. */ private final ArrayDeque inputChannelsWithData = new ArrayDeque<>(); + /** +* Field guaranteeing uniqueness for inputChannelsWithData queue. Both of those fields should be unified +* onto one. +*/ + private final Set enqueuedInputChannelsWithData = new HashSet<>(); --- End diff -- I wonder if this should not better be a `BitSet`? Do we typically expect very few enqueued channels with data from a large set of channels? > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362516#comment-16362516 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167546706 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { --- End diff -- I wonder if this loop could not be simplified to ``` while (!result.isFullRecord()) { tryFinishCurrentBufferBuilder(targetChannel, serializer); BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.setNextBufferBuilder(bufferBuilder); } ``` This would introduce a minor change in behaviour in cases where the end of the record falls exactly to the end of a buffer. With the change, the buffer is only finished by the next record and not on the spot. However this should not be a problem because this outcome is what usually should happen for almost every record beside those corner cases and thus the code should already handle them well. With this change, `tryFinishCurrentBufferBuilder` also does not longer require a return value. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362524#comment-16362524 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167594824 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- I suggest to just make a normal for-each iteration to close all, followed by a clear. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362519#comment-16362519 ] ASF GitHub Bot commented on FLINK-8581: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167556014 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collectionfutures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set futuresSet = new HashSet<>(); + for (Future future : futures) { --- End diff -- Could be replaced with `addAll()` or even the constructor taking collection. > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)