[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671355#comment-16671355
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski edited a comment on issue #6698: [FLINK-8581][network] Move flushing 
remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425
 
 
   @zhijiangW unfortunately no, I have to rethink this whole thing and it may 
need some much bigger refactor to avoid adding more locks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671351#comment-16671351
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski commented on issue #6698: [FLINK-8581][network] Move flushing remote 
subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425
 
 
   No, I have to rethink this whole thing and it may need some much bigger 
refactor to avoid adding more locks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651390#comment-16651390
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r225464137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   In that case I would have to re-evaluate this. Adding extra synchronisation 
would further complicate the code and could cause regressions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649831#comment-16649831
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r225059097
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   Yes, we may need add the synchronizing in above three methods for protecting 
 `localSubpartitions`, `remoteSubpartitionsMissingPeriodicFlushes` and 
`flushTimeout` which are operated by task thread, netty thread and flusher 
thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647850#comment-16647850
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r224765094
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   You might be right. But fix for that requires synchronizing:
   - `setFlushTimeout`
   - `createSubpartitionView`
   - `flushAllLocal`
   
   right :/ ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642875#comment-16642875
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223575788
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 ##
 @@ -115,8 +115,12 @@ public boolean isRegisteredAsAvailable() {
@Override
public boolean isAvailable() {
// BEWARE: this must be in sync with 
#isAvailable(BufferAndBacklog)!
-   return hasBuffersAvailable() &&
-   (numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
+   return hasBuffersAvailable() && !isBlocked();
+   }
+
+   @Override
+   public boolean isBlocked() {
 
 Review comment:
   I think the `isBlocked` naming can not indicate the specific semantics. How 
about changing to `isCreditsAvailable()` directly?
   
   To do so, the `isAvailable()` breaks into `buffersAvailable()` and 
`creditsAvailable()`, and the following private method 
`isAvailable(BufferAndBacklog bufferAndBacklog)` may also reuse 
`isCreditsAvailable()` .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642866#comment-16642866
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223574052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -119,6 +141,22 @@ private void enqueueAvailableReader(final 
NetworkSequenceViewReader reader) thro
}
}
 
+   private void flushReaders(long flushTimeout) throws Exception {
+   List readersToFlush = 
periodicFlushes.getReaders(flushTimeout);
+
+   boolean wasEmpty = availableReaders.isEmpty();
+
+   for (NetworkSequenceViewReader reader : readersToFlush) {
+   if (!reader.isRegisteredAsAvailable() && 
!reader.isBlocked()) {
 
 Review comment:
   I think the flush operation indicates if this reader has both unfinished 
`BufferConsumer` and credits, we still want to transport this buffer to reduce 
latency. So the conditions of available reader should cover both available 
buffers and available credits.  But the current conditions only confirm the 
reader has available credits to insert into queue. When we poll this reader 
from the queue and get next buffer to find null, it seems not make sense to 
register available reader here.
   
   So is it reasonable to adjust the conditions here to confirm this reader has 
both credits and unfinished buffers?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642790#comment-16642790
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223557953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
+   private final Map> scheduledEvents = 
new HashMap<>();
+
+   public void register(ChannelHandlerContext ctx, long 
flushTimeout, NetworkSequenceViewReader reader) {
 
 Review comment:
   remove `public` for all the methods in this inner class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642779#comment-16642779
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223556150
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
 
 Review comment:
   We should clear the related maps or cancel the future in 
`releaseAllResources` caused by `exceptionCaught` or `channelInactive`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642757#comment-16642757
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223553034
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
+   private final Map> scheduledEvents = 
new HashMap<>();
+
+   public void register(ChannelHandlerContext ctx, long 
flushTimeout, NetworkSequenceViewReader reader) {
+   checkState(!flushTimeouts.containsKey(reader));
+   checkState(flushTimeout > 0);
 
 Review comment:
   If we check the `flushTimeout` at the beginning in `ResultPartition` stack 
as mentioned above, we need not care about it in the following processes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642749#comment-16642749
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223552250
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -97,6 +107,18 @@ public void run() {
});
}
 
+   void registerPeriodicFlush(NetworkSequenceViewReader reader, long 
flushTimeout) {
+   if (flushTimeout == 0) {
 
 Review comment:
   This condition check can be done earlier in `ResultPartition` or 
`ResultSubpartition` stack?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642745#comment-16642745
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223550777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
 
 Review comment:
   1. The behavior of this method may be simple like `notifyDataAvailable`?  So 
we can reduce `synchronized` part.
   ```
if (readView != null) {
readView. registerPeriodicFlush(flushTimeout);
}
   ```
   2. This implementation is same in 
`SpillableSubpartition#registerPeriodicFlush`, how about implementing this 
method in the parent `ResultSubpartition#registerPeriodicFlush`? And the 
relevant change is also defining protected `readView` in `ResultSubpartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642742#comment-16642742
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223551011
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -106,11 +109,32 @@ protected Throwable getFailureCause() {
 
abstract public void flush();
 
+   /**
+* Remote subpartitions support automatic periodic flush. This is the 
method to register it.
+* Can only by used after {@link #isLocal()} state is known.
 
 Review comment:
   by used -> be used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642741#comment-16642741
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223550777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
 
 Review comment:
   1. The behavior of this method may be simple like `notifyDataAvailable`?  So 
we can reduce `synchronized` part.
   ```
if (readView != null) {
readView. registerPeriodicFlush(flushTimeout);
}
   ```
   2. This implementation is same in 
`SpillableSubpartition#registerPeriodicFlush`, maybe we can put this method in 
the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change 
is also defining protected `readView` in `ResultSubpartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642714#comment-16642714
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223546433
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
 
 Review comment:
   it's ->its


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641774#comment-16641774
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r221894938
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -225,6 +274,7 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
}
 
next = reader.getNextBuffer();
+
 
 Review comment:
   revert?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641773#comment-16641773
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r222015562
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
 ##
 @@ -24,8 +24,14 @@
  */
 public interface BufferAvailabilityListener {
 
+   default boolean isLocal() {
+   return false;
+   }
+
/**
 * Called whenever there might be new data available.
 */
void notifyDataAvailable();
+
+   void registerPeriodicFlush(long flushTimeout);
 
 Review comment:
   somehow the interface's name does not really match the extensions 
anymore...maybe you have a better idea?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641765#comment-16641765
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r222012019
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
 
 Review comment:
   Is it worth having `flushTimeouts` in memory only to optimise cancellation 
(and one debug check in `register()`)?
   
   Currently, this is used in two places:
   - in `register()` to check whether we have already added the reader - we 
could instead go through the list in `periodicFlushes` if we really wanted to
   - in `cancel()` to retrieve the right flush timeout so that we can easily 
remove the reader from `periodicFlushes` - we could iterate over it instead
   -> both alternative are a bit slower, but only happen during registration 
(if even) and cancellation and are therefore not performance critical


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641783#comment-16641783
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223342148
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -106,11 +109,32 @@ protected Throwable getFailureCause() {
 
abstract public void flush();
 
+   /**
+* Remote subpartitions support automatic periodic flush. This is the 
method to register it.
+* Can only by used after {@link #isLocal()} state is known.
+*/
+   public abstract void registerPeriodicFlush(long flushTimeout);
+
+   /**
+* @return empty if {@link #createReadView(BufferAvailabilityListener)} 
has not been yet called.
+* Afterwards returns {@code Optional.of(true)} or {@code 
Optional.of(false)}
+*/
+   public Optional isLocal() {
+   return isLocal;
+   }
+
abstract public void finish() throws IOException;
 
abstract public void release() throws IOException;
 
-   abstract public ResultSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException;
+   public ResultSubpartitionView createReadView(BufferAvailabilityListener 
availabilityListener) throws IOException {
+   isLocal = Optional.of(availabilityListener.isLocal());
+   return createReadViewInternal(availabilityListener);
+   }
+
+
 
 Review comment:
   nit: remove additional empty line (checkstyle)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641772#comment-16641772
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223287311
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -47,6 +48,8 @@
@GuardedBy("buffers")
private int buffersInBacklog;
 
+   private Optional isLocal = Optional.empty();
 
 Review comment:
   how about using `TernaryBoolean`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641768#comment-16641768
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r221851168
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -131,10 +169,17 @@ private void enqueueAvailableReader(final 
NetworkSequenceViewReader reader) thro
return availableReaders;
}
 
-   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
+   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
throws Exception {
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641778#comment-16641778
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223341962
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -106,11 +109,32 @@ protected Throwable getFailureCause() {
 
abstract public void flush();
 
+   /**
+* Remote subpartitions support automatic periodic flush. This is the 
method to register it.
+* Can only by used after {@link #isLocal()} state is known.
+*/
+   public abstract void registerPeriodicFlush(long flushTimeout);
+
+   /**
+* @return empty if {@link #createReadView(BufferAvailabilityListener)} 
has not been yet called.
+* Afterwards returns {@code Optional.of(true)} or {@code 
Optional.of(false)}
+*/
+   public Optional isLocal() {
+   return isLocal;
+   }
 
 Review comment:
   `isLocal` may not be required after the proposed change above


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641776#comment-16641776
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223342607
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -106,11 +109,32 @@ protected Throwable getFailureCause() {
 
abstract public void flush();
 
+   /**
+* Remote subpartitions support automatic periodic flush. This is the 
method to register it.
+* Can only by used after {@link #isLocal()} state is known.
+*/
+   public abstract void registerPeriodicFlush(long flushTimeout);
+
+   /**
+* @return empty if {@link #createReadView(BufferAvailabilityListener)} 
has not been yet called.
+* Afterwards returns {@code Optional.of(true)} or {@code 
Optional.of(false)}
+*/
+   public Optional isLocal() {
+   return isLocal;
+   }
+
abstract public void finish() throws IOException;
 
abstract public void release() throws IOException;
 
-   abstract public ResultSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException;
+   public ResultSubpartitionView createReadView(BufferAvailabilityListener 
availabilityListener) throws IOException {
 
 Review comment:
   `public final`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641766#comment-16641766
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r221851878
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -97,6 +107,18 @@ public void run() {
});
}
 
+   void registerPeriodicFlush(NetworkSequenceViewReader reader, long 
flushTimeout) {
+   if (flushTimeout == 0) {
+   return;
+   }
+   ctx.executor().execute(new Runnable() {
+   @Override
+   public void run() {
+   ctx.pipeline().fireUserEventTriggered(new 
RegisterPeriodicFlushEvent(reader, flushTimeout));
+   }
+   });
 
 Review comment:
   use a lambda function instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641782#comment-16641782
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223292943
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   either also `trimToSize()` here, or using an `Optional` as described above


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641777#comment-16641777
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223290601
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
+*/
+   private final ArrayList localSubpartitions = new 
ArrayList<>();
 
 Review comment:
   It would be nice, if you could also `trimToSize()` this once you know.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641770#comment-16641770
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223289891
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
+*/
+   private final ArrayList localSubpartitions = new 
ArrayList<>();
+
+   /**
+* Subset of {@code subpartitions} that are definitely remote, however 
once we determined that,
+* we haven't yet known about {@link #flushTimeout}. This has to be 
handled during
 
 Review comment:
   typo: `we may not yet know about the...`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641780#comment-16641780
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223292589
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
 
 Review comment:
   While this may not hurt, but are these two `checkState` really necessary? 
They kind of undermine `remoteSubpartitionsMissingPeriodicFlushes`'s authority 
;) - if any, shouldn't this be checked upon insertion into 
`remoteSubpartitionsMissingPeriodicFlushes`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641769#comment-16641769
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r221655152
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java
 ##
 @@ -52,6 +52,8 @@ void requestSubpartitionView(
 */
boolean isAvailable();
 
+   boolean isBlocked();
 
 Review comment:
   please add a comment what `isBlocked` should mean, especially since we 
already have an `isAvailable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641771#comment-16641771
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223340950
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
+*/
+   private final ArrayList localSubpartitions = new 
ArrayList<>();
+
+   /**
+* Subset of {@code subpartitions} that are definitely remote, however 
once we determined that,
+* we haven't yet known about {@link #flushTimeout}. This has to be 
handled during
+* {@link #setFlushTimeout(long)}.
+*/
+   private final ArrayList 
remoteSubpartitionsMissingPeriodicFlushes = new ArrayList<>();
+
 
 Review comment:
   actually, how about the following idea that should make things a bit simpler 
/ improve the abstraction:
   - only differentiate between self-flushing channels (periodically, after 
registration) and channels that require manual flushing (instead of remove vs. 
local)
   - have `PipelinedSubpartition#registerPeriodicFlush()` return whether 
registering for "self-flush" worked or not
   - have `PipelinedSubpartition#registerPeriodicFlush()` dealing with the 
local vs. non-local nature of the channel
   - only have `ResultSubpartition[] subpartitions` and 
`ArrayList manuallyFlushedSubpartitions` members


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641781#comment-16641781
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223290657
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
+*/
+   private final ArrayList localSubpartitions = new 
ArrayList<>();
+
+   /**
+* Subset of {@code subpartitions} that are definitely remote, however 
once we determined that,
+* we haven't yet known about {@link #flushTimeout}. This has to be 
handled during
+* {@link #setFlushTimeout(long)}.
+*/
+   private final ArrayList 
remoteSubpartitionsMissingPeriodicFlushes = new ArrayList<>();
 
 Review comment:
   Since this only exists temporarily, could we make it so that it doesn't 
occupy memory in the backing array anymore if not needed? Easiest way would be 
to use `trimToSize()` in `setFlushTimeout()` (see below) - or making this an 
`Optional`. I think, I'd prefer option 1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641775#comment-16641775
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223291788
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
 
 Review comment:
   nit: how about `localSubpartitions.forEach(ResultSubpartition::flush);`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641767#comment-16641767
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r221894761
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -193,7 +242,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, 
Object msg) throws Exc
}
}
 
-   allReaders.remove(toCancel);
+   
periodicFlushes.cancel(checkNotNull(allReaders.remove(toCancel)));
 
 Review comment:
   this seems a bit unreadable - how about putting the `checkNotNull` into the 
`cancel` method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641779#comment-16641779
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

NicoK commented on a change in pull request #6698: [FLINK-8581][network] Move 
flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r222017678
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
+   synchronized (buffers) {
+   checkState(readView != null);
 
 Review comment:
   What's the gain of this `checkState`? It would throw a NPE in the next line 
anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641606#comment-16641606
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223309406
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   There may exit race condition between `setFlushTimeout()` and 
`createSubpartitionView()`?
   If the task thread invokes `setFlushTimeout` and clears the 
`remoteSubpartitionsMissingPeriodicFlushes`, at the same time, the netty thread 
invokes creating subpartition view and adding this subpartition to the 
`remoteSubpartitionsMissingPeriodicFlushes` before setting `flushTimeout`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614942#comment-16614942
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski opened a new pull request #6698: [FLINK-8581][network] Move flushing 
remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698
 
 
   first commit comes from https://github.com/apache/flink/pull/6697
   
   This solves GC issues for cases with low latency (small flushTimeout) and 
many output channels and generally significantly improves low latency 
performance.
   
   OutputFlusher remains as for now to trigger flushes for local subpartitions.
   
   Registering periodic flushes in netty is unfortunately not the most 
beautiful thing in the world at the moment. It is complicated by two things:
   1. we do know about flushTimeout only in flink-streaming-java and 
StreamTask, which is long after the point when we are actually creating 
subpartitions
   2. we do not know before hand which subpartitions will be local and 
which will be remote
   
   ![Benchmark 
results](https://docs.google.com/spreadsheets/d/e/2PACX-1vQ4ImkIhEVyd0JuC0_KBzSiZk1ugqRYYJ29fftj8f7bvQHsyNTrS9PBS2g7YaI6q7kfyHXpWWsnb5lq/pubchart?oid=1194867281=image)
   
   Average throughput is significantly higher only for extreme cases, however 
the very important improvement here is solving (mitigating?) current GC issues, 
which is visible on the "min" graph. Without this change 1ms latency with 1000+ 
output channels suffers from frequent very long GC pauses.
   
   ## Verifying this change
   
   This change is cover by existing network stack tests, stress tests and 
almost all it cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369149#comment-16369149
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5423


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369142#comment-16369142
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r169082736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

 


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369143#comment-16369143
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5423
  
Thanks for those very good improvements, I will merge this.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369010#comment-16369010
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5423
  
I have rebased the PR and squashed the fixup commits.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369008#comment-16369008
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r169048737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

I would prefer to leave it as it is for now. I completely agree it's a bad 
design, however in this PR I'm only documenting this behaviour and changing the 
`null` to `Optional.empty()`, and because of the extensive scope of this change 
already as it is, I would prefer to fix this issue later on. 


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366841#comment-16366841
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168725762
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

What do you think about this?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366840#comment-16366840
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168725569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
 
BufferAndAvailability next = null;
try {
-   if (channel.isWritable()) {
-   while (true) {
-   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
-
-   // No queue with available data. We 
allow this here, because
-   // of the write callbacks that are 
executed after each write.
-   if (reader == null) {
-   return;
+   while (true) {
+   NetworkSequenceViewReader reader = 
poolAvailableReader();
+
+   // No queue with available data. We allow this 
here, because
+   // of the write callbacks that are executed 
after each write.
+   if (reader == null) {
+   return;
+   }
+
+   next = reader.getNextBuffer();
+   if (next == null) {
+   if (!reader.isReleased()) {
+   continue;
}
+   markAsReleased(reader.getReceiverId());
+
+   Throwable cause = 
reader.getFailureCause();
+   if (cause != null) {
+   ErrorResponse msg = new 
ErrorResponse(
+   new 
ProducerFailedException(cause),
+   reader.getReceiverId());
 
-   next = reader.getNextBuffer();
-
-   if (next == null) {
-   if (reader.isReleased()) {
-   
markAsReleased(reader.getReceiverId());
-   Throwable cause = 
reader.getFailureCause();
-
-   if (cause != null) {
-   ErrorResponse 
msg = new ErrorResponse(
-   new 
ProducerFailedException(cause),
-   
reader.getReceiverId());
-
-   
ctx.writeAndFlush(msg);
-   }
-   } else {
-   IllegalStateException 
err = new IllegalStateException(
-   "Bug in Netty 
consumer logic: reader queue got notified by partition " +
-   "about 
available data, but none was available.");
-   
handleException(ctx.channel(), err);
-   return;
-   }
-   } else {
-   // this channel was now removed 
from the non-empty reader queue
-   // we re-add it in case it has 
more data, because in that case no
-   // "non-empty" notification 
will come for that reader from the queue.
-   if (next.moreAvailable()) {
-   
nonEmptyReader.add(reader);
-   }
-
-   BufferResponse msg = new 
BufferResponse(
-   next.buffer(),
-   
reader.getSequenceNumber(),
-   reader.getReceiverId(),
   

[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366839#comment-16366839
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168725339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
 
+   checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
"input gates.");
}
}
 
+   if (bufferOrEvent.moreAvailable()) {
+   // this buffer or event was now removed from the 
non-empty gates queue
+   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+   // will come for that gate
+   queueInputGate(inputGate);
+   }
+
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return bufferOrEvent;
+   return Optional.ofNullable(bufferOrEvent);
+   }
+
+   @Override
+   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   throw new UnsupportedOperationException();
+   }
+
+   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   while (true) {
+   InputGate inputGate;
+   synchronized (inputGatesWithData) {
+   while (inputGatesWithData.size() == 0) {
+   inputGatesWithData.wait();
+   }
+   inputGate = inputGatesWithData.remove();
+   enqueuedInputGatesWithData.remove(inputGate);
+   }
+
+   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
--- End diff --

That sounds ok, maybe with some short pointer here to the high-level doc or 
else there is an increased change that somebody misses it.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366837#comment-16366837
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
+   checkNotNull(bufferConsumer);
 
synchronized (buffers) {
if (isFinished || isReleased) {
-   return;
+   bufferConsumer.close();
+   return false;
}
 
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
+   // Add the bufferConsumer and update the stats
+   buffers.add(bufferConsumer);
+   updateStatistics(bufferConsumer);
+   increaseBuffersInBacklog(bufferConsumer);
 
-   isFinished = true;
+   if (finish) {
+   isFinished = true;
+   notifyDataAvailable();
--- End diff --

 


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366835#comment-16366835
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724867
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

I think doing this as future work is ok.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366834#comment-16366834
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724735
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

Good question, there could be a package private method that returns the 
buffer, and the public method uses this method but does not return the buffer. 
But questionable if that is really better, because we also would need to ensure 
that the the public goes through the private method etc.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366830#comment-16366830
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724296
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
--- End diff --

 


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366828#comment-16366828
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
--- End diff --

 Can introduce this change later after some more extensive tests.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365619#comment-16365619
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168490355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
 
BufferAndAvailability next = null;
try {
-   if (channel.isWritable()) {
-   while (true) {
-   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
-
-   // No queue with available data. We 
allow this here, because
-   // of the write callbacks that are 
executed after each write.
-   if (reader == null) {
-   return;
+   while (true) {
+   NetworkSequenceViewReader reader = 
poolAvailableReader();
+
+   // No queue with available data. We allow this 
here, because
+   // of the write callbacks that are executed 
after each write.
+   if (reader == null) {
+   return;
+   }
+
+   next = reader.getNextBuffer();
+   if (next == null) {
+   if (!reader.isReleased()) {
+   continue;
}
+   markAsReleased(reader.getReceiverId());
+
+   Throwable cause = 
reader.getFailureCause();
+   if (cause != null) {
+   ErrorResponse msg = new 
ErrorResponse(
+   new 
ProducerFailedException(cause),
+   reader.getReceiverId());
 
-   next = reader.getNextBuffer();
-
-   if (next == null) {
-   if (reader.isReleased()) {
-   
markAsReleased(reader.getReceiverId());
-   Throwable cause = 
reader.getFailureCause();
-
-   if (cause != null) {
-   ErrorResponse 
msg = new ErrorResponse(
-   new 
ProducerFailedException(cause),
-   
reader.getReceiverId());
-
-   
ctx.writeAndFlush(msg);
-   }
-   } else {
-   IllegalStateException 
err = new IllegalStateException(
-   "Bug in Netty 
consumer logic: reader queue got notified by partition " +
-   "about 
available data, but none was available.");
-   
handleException(ctx.channel(), err);
-   return;
-   }
-   } else {
-   // this channel was now removed 
from the non-empty reader queue
-   // we re-add it in case it has 
more data, because in that case no
-   // "non-empty" notification 
will come for that reader from the queue.
-   if (next.moreAvailable()) {
-   
nonEmptyReader.add(reader);
-   }
-
-   BufferResponse msg = new 
BufferResponse(
-   next.buffer(),
-   
reader.getSequenceNumber(),
-   reader.getReceiverId(),
-   

[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365570#comment-16365570
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168486202
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
 
+   checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
"input gates.");
}
}
 
+   if (bufferOrEvent.moreAvailable()) {
+   // this buffer or event was now removed from the 
non-empty gates queue
+   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+   // will come for that gate
+   queueInputGate(inputGate);
+   }
+
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return bufferOrEvent;
+   return Optional.ofNullable(bufferOrEvent);
+   }
+
+   @Override
+   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   throw new UnsupportedOperationException();
+   }
+
+   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   while (true) {
+   InputGate inputGate;
+   synchronized (inputGatesWithData) {
+   while (inputGatesWithData.size() == 0) {
+   inputGatesWithData.wait();
+   }
+   inputGate = inputGatesWithData.remove();
+   enqueuedInputGatesWithData.remove(inputGate);
+   }
+
+   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
--- End diff --

It's kind of bad place for such comment - it can outdate without any 
control :/ What `UnionInputGate` know about `OutputFlusher` from the sender. 
This code should just assume that there is no guarantees about data 
notifications being accurate.

It should be place in some high level network stack documentation.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365554#comment-16365554
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168480406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
+   checkNotNull(bufferConsumer);
 
synchronized (buffers) {
if (isFinished || isReleased) {
-   return;
+   bufferConsumer.close();
+   return false;
}
 
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
+   // Add the bufferConsumer and update the stats
+   buffers.add(bufferConsumer);
+   updateStatistics(bufferConsumer);
+   increaseBuffersInBacklog(bufferConsumer);
 
-   isFinished = true;
+   if (finish) {
+   isFinished = true;
+   notifyDataAvailable();
--- End diff --

RTFM :)


https://github.com/apache/flink/pull/5423/commits/982edbce98db0bb7a5db0514d67aed0435a95d0f

1. it was done as separate commit, so is not related to rest of the changes
2. from commit message 
> notifyBuffersAvailable is a quick call that doesn't need to be executed 
outside of the lock

advantages of commit by commit reviewing ;)

double-confirm - no, this notification is very quick call, it only enqueue 
some work on a Netty's `Executor`.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365544#comment-16365544
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168479100
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

I see what you mean and I think that maybe this code could be deduplicated 
even further (moving `readView` field to the abstract class), but can we leave 
it as future work?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365534#comment-16365534
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168476375
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

Regarding returned value: without it I just didn't have a simple idea how 
to test for reference counting/recycling :/


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365527#comment-16365527
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168475520
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
--- End diff --

Generally speaking you are right, but I think you missed removing finished 
futures from the `futuresSet`. Without this coping, removal could cause `wtf` 
moment for a user (`waitForAll` method removing something from the passed 
collection) and without removing, this code would be slow for larger number of 
futures.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365513#comment-16365513
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168473812
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
--- End diff --

As we discussed, I'm not entirely sure. This "minor change" can be a 
significant overhead in case of many channels and large records. I don't want 
to risk increasing the scope of potential problems with this PR :(


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365508#comment-16365508
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168472169
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for producing {@link Buffer}.
+ *
+ * It reads data written by {@link BufferBuilder}.
+ * Although it is not thread safe and can be used only by one single 
thread, this thread can be different then the
+ * thread using/writing to {@link BufferBuilder}. Pattern here is simple: 
one thread writes data to
+ * {@link BufferBuilder} and there can be a different thread reading from 
it using {@link BufferConsumer}.
+ */
+@NotThreadSafe
+public class BufferConsumer implements Closeable {
--- End diff --

Yes, I know. Can you propose some different naming scheme? `BufferWriter` 
and `BufferBuilder`?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362530#comment-16362530
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167896900
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
+ */
+@ThreadSafe
+public abstract class AbstractCollectingResultPartitionWriter implements 
ResultPartitionWriter {
+   private final BufferProvider bufferProvider;
+   private final ArrayDeque bufferConsumers = new 
ArrayDeque<>();
+
+   public AbstractCollectingResultPartitionWriter(BufferProvider 
bufferProvider) {
+   this.bufferProvider = checkNotNull(bufferProvider);
+   }
+
+   @Override
+   public synchronized BufferProvider getBufferProvider() {
+   return bufferProvider;
+   }
+
+   @Override
+   public synchronized ResultPartitionID getPartitionId() {
+   return new ResultPartitionID();
--- End diff --

What is the intended effect of having this `synchronized`, looks like it 
does nothing?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362529#comment-16362529
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167622468
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

From the description and also to better contrast against 
`pollNextBufferOrEvent ()`, it almost feels like this method should always 
return a `BufferOrEvent` and rather throw an exception if `isFinished()`. This 
seems to be how the empty optional is translated anyways, see 
AbstractRecordReader which reacts with `IllegalStateException`.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362532#comment-16362532
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167621192
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 ---
@@ -60,12 +60,20 @@
void tagAsEvent();
 
/**
-* Returns the underlying memory segment.
+* Returns the underlying memory segment. This method is dangerous 
since it ignores read only protections and omits
+* slices. Use it only along the {@link #getMemorySegmentOffset()}.
 *
 * @return the memory segment backing this buffer
 */
+   @Deprecated
MemorySegment getMemorySegment();
 
+   /**
+* @return the offset where this (potential slice) {@link Buffer}'s 
data start in the underlying memory segment.
+*/
+   @Deprecated
--- End diff --

Same here.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362525#comment-16362525
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167859598
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

This method does not longer throw `InterruptedException`.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362518#comment-16362518
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167528361
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) 
{
}
 
/**
-* Writes the buffer to the {@link ResultPartitionWriter} and removes 
the
-* buffer from the serializer state.
+* Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 *
-* Needs to be synchronized on the serializer!
+* @return true if some data were written
 */
-   private void writeAndClearBuffer(
-   Buffer buffer,
+   private boolean tryFinishCurrentBufferBuilder(
int targetChannel,
RecordSerializer serializer) throws IOException {
 
-   try {
-   targetPartition.writeBuffer(buffer, targetChannel);
-   }
-   finally {
-   serializer.clearCurrentBuffer();
+   if (!bufferBuilders[targetChannel].isPresent()) {
+   return false;
}
+   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   bufferBuilders[targetChannel] = Optional.empty();
+
+   numBytesOut.inc(bufferBuilder.getWrittenBytes());
+   bufferBuilder.finish();
--- End diff --

You could combine this into `numBytesOut.inc(bufferBuilder.finish())` or 
maybe `finish()` should not need to have a return value?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362521#comment-16362521
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167604523
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
+   checkNotNull(bufferConsumer);
 
synchronized (buffers) {
if (isFinished || isReleased) {
-   return;
+   bufferConsumer.close();
+   return false;
}
 
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
+   // Add the bufferConsumer and update the stats
+   buffers.add(bufferConsumer);
+   updateStatistics(bufferConsumer);
+   increaseBuffersInBacklog(bufferConsumer);
 
-   isFinished = true;
+   if (finish) {
+   isFinished = true;
+   notifyDataAvailable();
--- End diff --

I noticed that this, introduces a subtle change: unlike before, the 
notification to the listeners now happens under the lock of `buffers`. Just 
want to double-check that this will not have negative side-effects for 
performance? Did this fix any correctness problems?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362515#comment-16362515
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167520277
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for producing {@link Buffer}.
+ *
+ * It reads data written by {@link BufferBuilder}.
+ * Although it is not thread safe and can be used only by one single 
thread, this thread can be different then the
+ * thread using/writing to {@link BufferBuilder}. Pattern here is simple: 
one thread writes data to
+ * {@link BufferBuilder} and there can be a different thread reading from 
it using {@link BufferConsumer}.
+ */
+@NotThreadSafe
+public class BufferConsumer implements Closeable {
--- End diff --

Just a thought about names: this is called `BufferConsumer`, but it does 
not "consume" buffers. It is coordinating the production of read slices from a 
shared buffer. `BufferBuilder` makes more sense then this. Even worse, this 
class has a `build() : Buffer` method :-(. 



> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362531#comment-16362531
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167527566
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ---
@@ -87,41 +86,12 @@ public boolean isFullBuffer() {
SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) 
throws IOException;
--- End diff --

One remark from reading the code, I found it a bit surprising that a method 
that looks like a setter will case the write to continue. Maybe this is better 
called something like `continueWritingWithNextBufferBuilder` or split the 
setter from a `continueWrite` method?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362517#comment-16362517
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167556221
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
--- End diff --

I think for all purposes, we do not need a set to deduplicate. If a future 
is contained multiple times and already finished, waiting for it again is 
basically a NOP.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362534#comment-16362534
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167621132
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 ---
@@ -60,12 +60,20 @@
void tagAsEvent();
 
/**
-* Returns the underlying memory segment.
+* Returns the underlying memory segment. This method is dangerous 
since it ignores read only protections and omits
+* slices. Use it only along the {@link #getMemorySegmentOffset()}.
 *
 * @return the memory segment backing this buffer
 */
+   @Deprecated
--- End diff --

You should name the proper replacement for this deprecated method in the 
comment, or say that it will be eventually removed without replacement.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362535#comment-16362535
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167883063
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
 
BufferAndAvailability next = null;
try {
-   if (channel.isWritable()) {
-   while (true) {
-   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
-
-   // No queue with available data. We 
allow this here, because
-   // of the write callbacks that are 
executed after each write.
-   if (reader == null) {
-   return;
+   while (true) {
+   NetworkSequenceViewReader reader = 
poolAvailableReader();
+
+   // No queue with available data. We allow this 
here, because
+   // of the write callbacks that are executed 
after each write.
+   if (reader == null) {
+   return;
+   }
+
+   next = reader.getNextBuffer();
+   if (next == null) {
+   if (!reader.isReleased()) {
+   continue;
}
+   markAsReleased(reader.getReceiverId());
+
+   Throwable cause = 
reader.getFailureCause();
+   if (cause != null) {
+   ErrorResponse msg = new 
ErrorResponse(
+   new 
ProducerFailedException(cause),
+   reader.getReceiverId());
 
-   next = reader.getNextBuffer();
-
-   if (next == null) {
-   if (reader.isReleased()) {
-   
markAsReleased(reader.getReceiverId());
-   Throwable cause = 
reader.getFailureCause();
-
-   if (cause != null) {
-   ErrorResponse 
msg = new ErrorResponse(
-   new 
ProducerFailedException(cause),
-   
reader.getReceiverId());
-
-   
ctx.writeAndFlush(msg);
-   }
-   } else {
-   IllegalStateException 
err = new IllegalStateException(
-   "Bug in Netty 
consumer logic: reader queue got notified by partition " +
-   "about 
available data, but none was available.");
-   
handleException(ctx.channel(), err);
-   return;
-   }
-   } else {
-   // this channel was now removed 
from the non-empty reader queue
-   // we re-add it in case it has 
more data, because in that case no
-   // "non-empty" notification 
will come for that reader from the queue.
-   if (next.moreAvailable()) {
-   
nonEmptyReader.add(reader);
-   }
-
-   BufferResponse msg = new 
BufferResponse(
-   next.buffer(),
-   
reader.getSequenceNumber(),
-   reader.getReceiverId(),
   

[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362527#comment-16362527
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167879211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -220,6 +273,19 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
}
}
 
+   private void registerAvailableReader(NetworkSequenceViewReader reader) {
+   availableReaders.add(reader);
+   reader.setRegisteredAsAvailable(true);
+   }
+
+   private NetworkSequenceViewReader poolAvailableReader() {
--- End diff --

This should probably be `pollAvailableReader()`


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362533#comment-16362533
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167860402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) 
{
}
 
/**
-* Writes the buffer to the {@link ResultPartitionWriter} and removes 
the
-* buffer from the serializer state.
+* Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 *
-* Needs to be synchronized on the serializer!
+* @return true if some data were written
 */
-   private void writeAndClearBuffer(
-   Buffer buffer,
+   private boolean tryFinishCurrentBufferBuilder(
int targetChannel,
RecordSerializer serializer) throws IOException {
--- End diff --

This code no longer throws `IOException`.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362520#comment-16362520
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167596726
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

Maybe this part of the `release()`method should also go into the 
superclass, which usually manages `buffers`. The method could also have an 
optional hook for additional cleanups inside the `synchronized` block that 
subclasses can use if needed.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362528#comment-16362528
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167818220
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
 
+   checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
"input gates.");
}
}
 
+   if (bufferOrEvent.moreAvailable()) {
+   // this buffer or event was now removed from the 
non-empty gates queue
+   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+   // will come for that gate
+   queueInputGate(inputGate);
+   }
+
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return bufferOrEvent;
+   return Optional.ofNullable(bufferOrEvent);
+   }
+
+   @Override
+   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   throw new UnsupportedOperationException();
+   }
+
+   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   while (true) {
+   InputGate inputGate;
+   synchronized (inputGatesWithData) {
+   while (inputGatesWithData.size() == 0) {
+   inputGatesWithData.wait();
+   }
+   inputGate = inputGatesWithData.remove();
+   enqueuedInputGatesWithData.remove(inputGate);
+   }
+
+   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
--- End diff --

Maybe we can add a comment explaining why this can happen now, i.e. 
mentioning about the output flusher.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362523#comment-16362523
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167588455
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

I think this method does not truly require a return value. The return value 
is only used in one test, and I found it confusing that it is first closed and 
then returned.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362536#comment-16362536
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167857194
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
--- End diff --

I think you can remove the `throws IOException`, and after that also on the 
public `add(...)`.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362526#comment-16362526
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167898515
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
+ */
+@ThreadSafe
+public abstract class AbstractCollectingResultPartitionWriter implements 
ResultPartitionWriter {
+   private final BufferProvider bufferProvider;
+   private final ArrayDeque bufferConsumers = new 
ArrayDeque<>();
+
+   public AbstractCollectingResultPartitionWriter(BufferProvider 
bufferProvider) {
+   this.bufferProvider = checkNotNull(bufferProvider);
+   }
+
+   @Override
+   public synchronized BufferProvider getBufferProvider() {
+   return bufferProvider;
--- End diff --

What does this `synchronize` help? The field is `final`, so I would assume 
this change is not required.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362522#comment-16362522
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167618791
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -138,6 +142,12 @@
/** Channels, which notified this input gate about available data. */
private final ArrayDeque inputChannelsWithData = new 
ArrayDeque<>();
 
+   /**
+* Field guaranteeing uniqueness for inputChannelsWithData queue. Both 
of those fields should be unified
+* onto one.
+*/
+   private final Set enqueuedInputChannelsWithData = new 
HashSet<>();
--- End diff --

I wonder if this should not better be a `BitSet`? Do we typically expect 
very few enqueued channels with data from a large set of  channels?


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362516#comment-16362516
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167546706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
--- End diff --

I wonder if this loop could not be simplified to
```
while (!result.isFullRecord()) {
tryFinishCurrentBufferBuilder(targetChannel, 
serializer);
BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
result = serializer.setNextBufferBuilder(bufferBuilder);
}
```

This would introduce a minor change in behaviour in cases where the end of 
the record falls exactly to the end of a buffer. With the change, the buffer is 
only finished by the next record and not on the spot. However this should not 
be a problem because this outcome is what usually should happen for almost 
every record beside those corner cases and thus the code should already handle 
them well.
With this change, `tryFinishCurrentBufferBuilder` also does not longer 
require a return value.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362524#comment-16362524
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167594824
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

I suggest to just make a normal for-each iteration to close all, followed 
by a clear.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362519#comment-16362519
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167556014
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
+   for (Future future : futures) {
--- End diff --

Could be replaced with `addAll()` or even the constructor taking collection.


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)