[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5423


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r169082736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

👍 


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r169048737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

I would prefer to leave it as it is for now. I completely agree it's a bad 
design, however in this PR I'm only documenting this behaviour and changing the 
`null` to `Optional.empty()`, and because of the extensive scope of this change 
already as it is, I would prefer to fix this issue later on. 


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168725762
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

What do you think about this?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168725569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
 
BufferAndAvailability next = null;
try {
-   if (channel.isWritable()) {
-   while (true) {
-   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
-
-   // No queue with available data. We 
allow this here, because
-   // of the write callbacks that are 
executed after each write.
-   if (reader == null) {
-   return;
+   while (true) {
+   NetworkSequenceViewReader reader = 
poolAvailableReader();
+
+   // No queue with available data. We allow this 
here, because
+   // of the write callbacks that are executed 
after each write.
+   if (reader == null) {
+   return;
+   }
+
+   next = reader.getNextBuffer();
+   if (next == null) {
+   if (!reader.isReleased()) {
+   continue;
}
+   markAsReleased(reader.getReceiverId());
+
+   Throwable cause = 
reader.getFailureCause();
+   if (cause != null) {
+   ErrorResponse msg = new 
ErrorResponse(
+   new 
ProducerFailedException(cause),
+   reader.getReceiverId());
 
-   next = reader.getNextBuffer();
-
-   if (next == null) {
-   if (reader.isReleased()) {
-   
markAsReleased(reader.getReceiverId());
-   Throwable cause = 
reader.getFailureCause();
-
-   if (cause != null) {
-   ErrorResponse 
msg = new ErrorResponse(
-   new 
ProducerFailedException(cause),
-   
reader.getReceiverId());
-
-   
ctx.writeAndFlush(msg);
-   }
-   } else {
-   IllegalStateException 
err = new IllegalStateException(
-   "Bug in Netty 
consumer logic: reader queue got notified by partition " +
-   "about 
available data, but none was available.");
-   
handleException(ctx.channel(), err);
-   return;
-   }
-   } else {
-   // this channel was now removed 
from the non-empty reader queue
-   // we re-add it in case it has 
more data, because in that case no
-   // "non-empty" notification 
will come for that reader from the queue.
-   if (next.moreAvailable()) {
-   
nonEmptyReader.add(reader);
-   }
-
-   BufferResponse msg = new 
BufferResponse(
-   next.buffer(),
-   
reader.getSequenceNumber(),
-   reader.getReceiverId(),
-   0);
-
-   if 
(isEndOfPartitionEvent(next.buffer())) {
-   

[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168725339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
 
+   checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
"input gates.");
}
}
 
+   if (bufferOrEvent.moreAvailable()) {
+   // this buffer or event was now removed from the 
non-empty gates queue
+   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+   // will come for that gate
+   queueInputGate(inputGate);
+   }
+
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return bufferOrEvent;
+   return Optional.ofNullable(bufferOrEvent);
+   }
+
+   @Override
+   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   throw new UnsupportedOperationException();
+   }
+
+   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   while (true) {
+   InputGate inputGate;
+   synchronized (inputGatesWithData) {
+   while (inputGatesWithData.size() == 0) {
+   inputGatesWithData.wait();
+   }
+   inputGate = inputGatesWithData.remove();
+   enqueuedInputGatesWithData.remove(inputGate);
+   }
+
+   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
--- End diff --

That sounds ok, maybe with some short pointer here to the high-level doc or 
else there is an increased change that somebody misses it.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
+   checkNotNull(bufferConsumer);
 
synchronized (buffers) {
if (isFinished || isReleased) {
-   return;
+   bufferConsumer.close();
+   return false;
}
 
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
+   // Add the bufferConsumer and update the stats
+   buffers.add(bufferConsumer);
+   updateStatistics(bufferConsumer);
+   increaseBuffersInBacklog(bufferConsumer);
 
-   isFinished = true;
+   if (finish) {
+   isFinished = true;
+   notifyDataAvailable();
--- End diff --

👍 


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724867
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

I think doing this as future work is ok.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724735
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

Good question, there could be a package private method that returns the 
buffer, and the public method uses this method but does not return the buffer. 
But questionable if that is really better, because we also would need to ensure 
that the the public goes through the private method etc.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724296
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
--- End diff --

👍 


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168724127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
--- End diff --

👍 Can introduce this change later after some more extensive tests.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168490355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
 
BufferAndAvailability next = null;
try {
-   if (channel.isWritable()) {
-   while (true) {
-   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
-
-   // No queue with available data. We 
allow this here, because
-   // of the write callbacks that are 
executed after each write.
-   if (reader == null) {
-   return;
+   while (true) {
+   NetworkSequenceViewReader reader = 
poolAvailableReader();
+
+   // No queue with available data. We allow this 
here, because
+   // of the write callbacks that are executed 
after each write.
+   if (reader == null) {
+   return;
+   }
+
+   next = reader.getNextBuffer();
+   if (next == null) {
+   if (!reader.isReleased()) {
+   continue;
}
+   markAsReleased(reader.getReceiverId());
+
+   Throwable cause = 
reader.getFailureCause();
+   if (cause != null) {
+   ErrorResponse msg = new 
ErrorResponse(
+   new 
ProducerFailedException(cause),
+   reader.getReceiverId());
 
-   next = reader.getNextBuffer();
-
-   if (next == null) {
-   if (reader.isReleased()) {
-   
markAsReleased(reader.getReceiverId());
-   Throwable cause = 
reader.getFailureCause();
-
-   if (cause != null) {
-   ErrorResponse 
msg = new ErrorResponse(
-   new 
ProducerFailedException(cause),
-   
reader.getReceiverId());
-
-   
ctx.writeAndFlush(msg);
-   }
-   } else {
-   IllegalStateException 
err = new IllegalStateException(
-   "Bug in Netty 
consumer logic: reader queue got notified by partition " +
-   "about 
available data, but none was available.");
-   
handleException(ctx.channel(), err);
-   return;
-   }
-   } else {
-   // this channel was now removed 
from the non-empty reader queue
-   // we re-add it in case it has 
more data, because in that case no
-   // "non-empty" notification 
will come for that reader from the queue.
-   if (next.moreAvailable()) {
-   
nonEmptyReader.add(reader);
-   }
-
-   BufferResponse msg = new 
BufferResponse(
-   next.buffer(),
-   
reader.getSequenceNumber(),
-   reader.getReceiverId(),
-   0);
-
-   if 
(isEndOfPartitionEvent(next.buffer())) {
-   

[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168486202
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
 
+   checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
"input gates.");
}
}
 
+   if (bufferOrEvent.moreAvailable()) {
+   // this buffer or event was now removed from the 
non-empty gates queue
+   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+   // will come for that gate
+   queueInputGate(inputGate);
+   }
+
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return bufferOrEvent;
+   return Optional.ofNullable(bufferOrEvent);
+   }
+
+   @Override
+   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   throw new UnsupportedOperationException();
+   }
+
+   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   while (true) {
+   InputGate inputGate;
+   synchronized (inputGatesWithData) {
+   while (inputGatesWithData.size() == 0) {
+   inputGatesWithData.wait();
+   }
+   inputGate = inputGatesWithData.remove();
+   enqueuedInputGatesWithData.remove(inputGate);
+   }
+
+   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
--- End diff --

It's kind of bad place for such comment - it can outdate without any 
control :/ What `UnionInputGate` know about `OutputFlusher` from the sender. 
This code should just assume that there is no guarantees about data 
notifications being accurate.

It should be place in some high level network stack documentation.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168480406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
+   checkNotNull(bufferConsumer);
 
synchronized (buffers) {
if (isFinished || isReleased) {
-   return;
+   bufferConsumer.close();
+   return false;
}
 
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
+   // Add the bufferConsumer and update the stats
+   buffers.add(bufferConsumer);
+   updateStatistics(bufferConsumer);
+   increaseBuffersInBacklog(bufferConsumer);
 
-   isFinished = true;
+   if (finish) {
+   isFinished = true;
+   notifyDataAvailable();
--- End diff --

RTFM :)


https://github.com/apache/flink/pull/5423/commits/982edbce98db0bb7a5db0514d67aed0435a95d0f

1. it was done as separate commit, so is not related to rest of the changes
2. from commit message 
> notifyBuffersAvailable is a quick call that doesn't need to be executed 
outside of the lock

advantages of commit by commit reviewing ;)

double-confirm - no, this notification is very quick call, it only enqueue 
some work on a Netty's `Executor`.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168479100
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

I see what you mean and I think that maybe this code could be deduplicated 
even further (moving `readView` field to the abstract class), but can we leave 
it as future work?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168476375
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

Regarding returned value: without it I just didn't have a simple idea how 
to test for reference counting/recycling :/


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168475520
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
--- End diff --

Generally speaking you are right, but I think you missed removing finished 
futures from the `futuresSet`. Without this coping, removal could cause `wtf` 
moment for a user (`waitForAll` method removing something from the passed 
collection) and without removing, this code would be slow for larger number of 
futures.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168473812
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
--- End diff --

As we discussed, I'm not entirely sure. This "minor change" can be a 
significant overhead in case of many channels and large records. I don't want 
to risk increasing the scope of potential problems with this PR :(


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r168472169
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for producing {@link Buffer}.
+ *
+ * It reads data written by {@link BufferBuilder}.
+ * Although it is not thread safe and can be used only by one single 
thread, this thread can be different then the
+ * thread using/writing to {@link BufferBuilder}. Pattern here is simple: 
one thread writes data to
+ * {@link BufferBuilder} and there can be a different thread reading from 
it using {@link BufferConsumer}.
+ */
+@NotThreadSafe
+public class BufferConsumer implements Closeable {
--- End diff --

Yes, I know. Can you propose some different naming scheme? `BufferWriter` 
and `BufferBuilder`?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167860402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) 
{
}
 
/**
-* Writes the buffer to the {@link ResultPartitionWriter} and removes 
the
-* buffer from the serializer state.
+* Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 *
-* Needs to be synchronized on the serializer!
+* @return true if some data were written
 */
-   private void writeAndClearBuffer(
-   Buffer buffer,
+   private boolean tryFinishCurrentBufferBuilder(
int targetChannel,
RecordSerializer serializer) throws IOException {
--- End diff --

This code no longer throws `IOException`.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167527566
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ---
@@ -87,41 +86,12 @@ public boolean isFullBuffer() {
SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) 
throws IOException;
--- End diff --

One remark from reading the code, I found it a bit surprising that a method 
that looks like a setter will case the write to continue. Maybe this is better 
called something like `continueWritingWithNextBufferBuilder` or split the 
setter from a `continueWrite` method?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167859598
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

This method does not longer throw `InterruptedException`.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167556014
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
+   for (Future future : futures) {
--- End diff --

Could be replaced with `addAll()` or even the constructor taking collection.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167556221
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---
@@ -45,4 +52,36 @@ private FutureUtil() {
 
return future.get();
}
+
+   public static void waitForAll(long timeoutMillis, Future...futures) 
throws Exception {
+   waitForAll(timeoutMillis, Arrays.asList(futures));
+   }
+
+   public static void waitForAll(long timeoutMillis, Collection 
futures) throws Exception {
+   long startMillis = System.currentTimeMillis();
+   Set futuresSet = new HashSet<>();
--- End diff --

I think for all purposes, we do not need a set to deduplicate. If a future 
is contained multiple times and already finished, waiting for it again is 
basically a NOP.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167879211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -220,6 +273,19 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
}
}
 
+   private void registerAvailableReader(NetworkSequenceViewReader reader) {
+   availableReaders.add(reader);
+   reader.setRegisteredAsAvailable(true);
+   }
+
+   private NetworkSequenceViewReader poolAvailableReader() {
--- End diff --

This should probably be `pollAvailableReader()`


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167588455
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
+   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+   // If this was a full record, we are done. Not 
breaking
+   // out of the loop at this point will lead to 
another
+   // buffer request before breaking out (that 
would not be
+   // a problem per se, but it can lead to stalls 
in the
+   // pipeline).
+   if (result.isFullRecord()) {
+   break;
}
}
+   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+   result = serializer.setNextBufferBuilder(bufferBuilder);
}
+   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
}
 
-   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-   final Buffer eventBuffer = EventSerializer.toBuffer(event);
-   try {
+   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
--- End diff --

I think this method does not truly require a return value. The return value 
is only used in one test, and I found it confusing that it is first closed and 
then returned.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167520277
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for producing {@link Buffer}.
+ *
+ * It reads data written by {@link BufferBuilder}.
+ * Although it is not thread safe and can be used only by one single 
thread, this thread can be different then the
+ * thread using/writing to {@link BufferBuilder}. Pattern here is simple: 
one thread writes data to
+ * {@link BufferBuilder} and there can be a different thread reading from 
it using {@link BufferConsumer}.
+ */
+@NotThreadSafe
+public class BufferConsumer implements Closeable {
--- End diff --

Just a thought about names: this is called `BufferConsumer`, but it does 
not "consume" buffers. It is coordinating the production of read slices from a 
shared buffer. `BufferBuilder` makes more sense then this. Even worse, this 
class has a `build() : Buffer` method :-(. 



---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167618791
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -138,6 +142,12 @@
/** Channels, which notified this input gate about available data. */
private final ArrayDeque inputChannelsWithData = new 
ArrayDeque<>();
 
+   /**
+* Field guaranteeing uniqueness for inputChannelsWithData queue. Both 
of those fields should be unified
+* onto one.
+*/
+   private final Set enqueuedInputChannelsWithData = new 
HashSet<>();
--- End diff --

I wonder if this should not better be a `BitSet`? Do we typically expect 
very few enqueued channels with data from a large set of  channels?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167546706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
RecordSerializer serializer = serializers[targetChannel];
 
-   synchronized (serializer) {
-   SerializationResult result = 
serializer.addRecord(record);
-
-   while (result.isFullBuffer()) {
-   Buffer buffer = serializer.getCurrentBuffer();
-
-   if (buffer != null) {
-   numBytesOut.inc(buffer.getSizeUnsafe());
-   writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
-   // If this was a full record, we are 
done. Not breaking
-   // out of the loop at this point will 
lead to another
-   // buffer request before breaking out 
(that would not be
-   // a problem per se, but it can lead to 
stalls in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
-   } else {
-   BufferBuilder bufferBuilder =
-   
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-   result = 
serializer.setNextBufferBuilder(bufferBuilder);
+   SerializationResult result = serializer.addRecord(record);
+
+   while (result.isFullBuffer()) {
--- End diff --

I wonder if this loop could not be simplified to
```
while (!result.isFullRecord()) {
tryFinishCurrentBufferBuilder(targetChannel, 
serializer);
BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
result = serializer.setNextBufferBuilder(bufferBuilder);
}
```

This would introduce a minor change in behaviour in cases where the end of 
the record falls exactly to the end of a buffer. With the change, the buffer is 
only finished by the next record and not on the spot. However this should not 
be a problem because this outcome is what usually should happen for almost 
every record beside those corner cases and thus the code should already handle 
them well.
With this change, `tryFinishCurrentBufferBuilder` also does not longer 
require a return value.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167622468
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 ---
@@ -72,7 +73,19 @@
 
void requestPartitions() throws IOException, InterruptedException;
 
-   BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+   /**
+* Blocking call waiting for next {@link BufferOrEvent}.
+*
+* @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+*/
+   Optional getNextBufferOrEvent() throws IOException, 
InterruptedException;
--- End diff --

From the description and also to better contrast against 
`pollNextBufferOrEvent ()`, it almost feels like this method should always 
return a `BufferOrEvent` and rather throw an exception if `isFinished()`. This 
seems to be how the empty optional is translated anyways, see 
AbstractRecordReader which reacts with `IllegalStateException`.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167857194
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
--- End diff --

I think you can remove the `throws IOException`, and after that also on the 
public `add(...)`.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167621192
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 ---
@@ -60,12 +60,20 @@
void tagAsEvent();
 
/**
-* Returns the underlying memory segment.
+* Returns the underlying memory segment. This method is dangerous 
since it ignores read only protections and omits
+* slices. Use it only along the {@link #getMemorySegmentOffset()}.
 *
 * @return the memory segment backing this buffer
 */
+   @Deprecated
MemorySegment getMemorySegment();
 
+   /**
+* @return the offset where this (potential slice) {@link Buffer}'s 
data start in the underlying memory segment.
+*/
+   @Deprecated
--- End diff --

Same here.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167594824
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

I suggest to just make a normal for-each iteration to close all, followed 
by a clear.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167528361
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
@@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) 
{
}
 
/**
-* Writes the buffer to the {@link ResultPartitionWriter} and removes 
the
-* buffer from the serializer state.
+* Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 *
-* Needs to be synchronized on the serializer!
+* @return true if some data were written
 */
-   private void writeAndClearBuffer(
-   Buffer buffer,
+   private boolean tryFinishCurrentBufferBuilder(
int targetChannel,
RecordSerializer serializer) throws IOException {
 
-   try {
-   targetPartition.writeBuffer(buffer, targetChannel);
-   }
-   finally {
-   serializer.clearCurrentBuffer();
+   if (!bufferBuilders[targetChannel].isPresent()) {
+   return false;
}
+   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   bufferBuilders[targetChannel] = Optional.empty();
+
+   numBytesOut.inc(bufferBuilder.getWrittenBytes());
+   bufferBuilder.finish();
--- End diff --

You could combine this into `numBytesOut.inc(bufferBuilder.finish())` or 
maybe `finish()` should not need to have a return value?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167896900
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
+ */
+@ThreadSafe
+public abstract class AbstractCollectingResultPartitionWriter implements 
ResultPartitionWriter {
+   private final BufferProvider bufferProvider;
+   private final ArrayDeque bufferConsumers = new 
ArrayDeque<>();
+
+   public AbstractCollectingResultPartitionWriter(BufferProvider 
bufferProvider) {
+   this.bufferProvider = checkNotNull(bufferProvider);
+   }
+
+   @Override
+   public synchronized BufferProvider getBufferProvider() {
+   return bufferProvider;
+   }
+
+   @Override
+   public synchronized ResultPartitionID getPartitionId() {
+   return new ResultPartitionID();
--- End diff --

What is the intended effect of having this `synchronized`, looks like it 
does nothing?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167604523
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
-   /** The number of non-event buffers currently in this subpartition. */
-   @GuardedBy("buffers")
-   private int buffersInBacklog;
-
// 

 
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
 
@Override
-   public boolean add(Buffer buffer) throws IOException {
-   checkNotNull(buffer);
-
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   public boolean add(BufferConsumer bufferConsumer) throws IOException {
+   return add(bufferConsumer, false);
+   }
 
+   @Override
+   public void flush() {
synchronized (buffers) {
-   if (isFinished || isReleased) {
-   buffer.recycleBuffer();
-   return false;
+   if (readView != null) {
+   readView.notifyDataAvailable();
}
-
-   // Add the buffer and update the stats
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
-   increaseBuffersInBacklog(buffer);
-   }
-
-   // Notify the listener outside of the synchronized block
-   if (reader != null) {
-   reader.notifyBuffersAvailable(1);
}
-
-   return true;
}
 
@Override
public void finish() throws IOException {
-   final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+   
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+   LOG.debug("Finished {}.", this);
+   }
 
-   // view reference accessible outside the lock, but assigned 
inside the locked scope
-   final PipelinedSubpartitionView reader;
+   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
+   checkNotNull(bufferConsumer);
 
synchronized (buffers) {
if (isFinished || isReleased) {
-   return;
+   bufferConsumer.close();
+   return false;
}
 
-   buffers.add(buffer);
-   reader = readView;
-   updateStatistics(buffer);
+   // Add the bufferConsumer and update the stats
+   buffers.add(bufferConsumer);
+   updateStatistics(bufferConsumer);
+   increaseBuffersInBacklog(bufferConsumer);
 
-   isFinished = true;
+   if (finish) {
+   isFinished = true;
+   notifyDataAvailable();
--- End diff --

I noticed that this, introduces a subtle change: unlike before, the 
notification to the listeners now happens under the lock of `buffers`. Just 
want to double-check that this will not have negative side-effects for 
performance? Did this fix any correctness problems?


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167621132
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 ---
@@ -60,12 +60,20 @@
void tagAsEvent();
 
/**
-* Returns the underlying memory segment.
+* Returns the underlying memory segment. This method is dangerous 
since it ignores read only protections and omits
+* slices. Use it only along the {@link #getMemorySegmentOffset()}.
 *
 * @return the memory segment backing this buffer
 */
+   @Deprecated
--- End diff --

You should name the proper replacement for this deprecated method in the 
comment, or say that it will be eventually removed without replacement.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167898515
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
+ */
+@ThreadSafe
+public abstract class AbstractCollectingResultPartitionWriter implements 
ResultPartitionWriter {
+   private final BufferProvider bufferProvider;
+   private final ArrayDeque bufferConsumers = new 
ArrayDeque<>();
+
+   public AbstractCollectingResultPartitionWriter(BufferProvider 
bufferProvider) {
+   this.bufferProvider = checkNotNull(bufferProvider);
+   }
+
+   @Override
+   public synchronized BufferProvider getBufferProvider() {
+   return bufferProvider;
--- End diff --

What does this `synchronize` help? The field is `final`, so I would assume 
this change is not required.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167818220
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
 
+   checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
"input gates.");
}
}
 
+   if (bufferOrEvent.moreAvailable()) {
+   // this buffer or event was now removed from the 
non-empty gates queue
+   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+   // will come for that gate
+   queueInputGate(inputGate);
+   }
+
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return bufferOrEvent;
+   return Optional.ofNullable(bufferOrEvent);
+   }
+
+   @Override
+   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   throw new UnsupportedOperationException();
+   }
+
+   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   while (true) {
+   InputGate inputGate;
+   synchronized (inputGatesWithData) {
+   while (inputGatesWithData.size() == 0) {
+   inputGatesWithData.wait();
+   }
+   inputGate = inputGatesWithData.remove();
+   enqueuedInputGatesWithData.remove(inputGate);
+   }
+
+   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
--- End diff --

Maybe we can add a comment explaining why this can happen now, i.e. 
mentioning about the output flusher.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167596726
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -131,9 +114,9 @@ public void release() {
}
 
// Release all available buffers
-   Buffer buffer;
-   while ((buffer = buffers.poll()) != null) {
-   buffer.recycleBuffer();
+   BufferConsumer bufferConsumer;
+   while ((bufferConsumer = buffers.poll()) != null) {
--- End diff --

Maybe this part of the `release()`method should also go into the 
superclass, which usually manages `buffers`. The method could also have an 
optional hook for additional cleanups inside the `synchronized` block that 
subclasses can use if needed.


---


[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...

2018-02-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5423#discussion_r167883063
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
 
BufferAndAvailability next = null;
try {
-   if (channel.isWritable()) {
-   while (true) {
-   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
-
-   // No queue with available data. We 
allow this here, because
-   // of the write callbacks that are 
executed after each write.
-   if (reader == null) {
-   return;
+   while (true) {
+   NetworkSequenceViewReader reader = 
poolAvailableReader();
+
+   // No queue with available data. We allow this 
here, because
+   // of the write callbacks that are executed 
after each write.
+   if (reader == null) {
+   return;
+   }
+
+   next = reader.getNextBuffer();
+   if (next == null) {
+   if (!reader.isReleased()) {
+   continue;
}
+   markAsReleased(reader.getReceiverId());
+
+   Throwable cause = 
reader.getFailureCause();
+   if (cause != null) {
+   ErrorResponse msg = new 
ErrorResponse(
+   new 
ProducerFailedException(cause),
+   reader.getReceiverId());
 
-   next = reader.getNextBuffer();
-
-   if (next == null) {
-   if (reader.isReleased()) {
-   
markAsReleased(reader.getReceiverId());
-   Throwable cause = 
reader.getFailureCause();
-
-   if (cause != null) {
-   ErrorResponse 
msg = new ErrorResponse(
-   new 
ProducerFailedException(cause),
-   
reader.getReceiverId());
-
-   
ctx.writeAndFlush(msg);
-   }
-   } else {
-   IllegalStateException 
err = new IllegalStateException(
-   "Bug in Netty 
consumer logic: reader queue got notified by partition " +
-   "about 
available data, but none was available.");
-   
handleException(ctx.channel(), err);
-   return;
-   }
-   } else {
-   // this channel was now removed 
from the non-empty reader queue
-   // we re-add it in case it has 
more data, because in that case no
-   // "non-empty" notification 
will come for that reader from the queue.
-   if (next.moreAvailable()) {
-   
nonEmptyReader.add(reader);
-   }
-
-   BufferResponse msg = new 
BufferResponse(
-   next.buffer(),
-   
reader.getSequenceNumber(),
-   reader.getReceiverId(),
-   0);
-
-   if 
(isEndOfPartitionEvent(next.buffer())) {
-