[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5423 ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r169082736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- ð ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r169048737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- I would prefer to leave it as it is for now. I completely agree it's a bad design, however in this PR I'm only documenting this behaviour and changing the `null` to `Optional.empty()`, and because of the extensive scope of this change already as it is, I would prefer to fix this issue later on. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168725762 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- What do you think about this? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168725569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - 0); - - if (isEndOfPartitionEvent(next.buffer())) { - reader.notifySubpartitionC
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168725339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- That sounds ok, maybe with some short pointer here to the high-level doc or else there is an increased change that somebody misses it. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { + checkNotNull(bufferConsumer); synchronized (buffers) { if (isFinished || isReleased) { - return; + bufferConsumer.close(); + return false; } - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); + // Add the bufferConsumer and update the stats + buffers.add(bufferConsumer); + updateStatistics(bufferConsumer); + increaseBuffersInBacklog(bufferConsumer); - isFinished = true; + if (finish) { + isFinished = true; + notifyDataAvailable(); --- End diff -- ð ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- I think doing this as future work is ok. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724735 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- Good question, there could be a package private method that returns the buffer, and the public method uses this method but does not return the buffer. But questionable if that is really better, because we also would need to ensure that the the public goes through the private method etc. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724296 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collection> futures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set> futuresSet = new HashSet<>(); --- End diff -- ð ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168724127 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { --- End diff -- ð Can introduce this change later after some more extensive tests. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168490355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - 0); - - if (isEndOfPartitionEvent(next.buffer())) { - reader.notifySubpartitionConsum
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168486202 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- It's kind of bad place for such comment - it can outdate without any control :/ What `UnionInputGate` know about `OutputFlusher` from the sender. This code should just assume that there is no guarantees about data notifications being accurate. It should be place in some high level network stack documentation. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168480406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { + checkNotNull(bufferConsumer); synchronized (buffers) { if (isFinished || isReleased) { - return; + bufferConsumer.close(); + return false; } - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); + // Add the bufferConsumer and update the stats + buffers.add(bufferConsumer); + updateStatistics(bufferConsumer); + increaseBuffersInBacklog(bufferConsumer); - isFinished = true; + if (finish) { + isFinished = true; + notifyDataAvailable(); --- End diff -- RTFM :) https://github.com/apache/flink/pull/5423/commits/982edbce98db0bb7a5db0514d67aed0435a95d0f 1. it was done as separate commit, so is not related to rest of the changes 2. from commit message > notifyBuffersAvailable is a quick call that doesn't need to be executed outside of the lock advantages of commit by commit reviewing ;) double-confirm - no, this notification is very quick call, it only enqueue some work on a Netty's `Executor`. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168479100 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- I see what you mean and I think that maybe this code could be deduplicated even further (moving `readView` field to the abstract class), but can we leave it as future work? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168476375 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- Regarding returned value: without it I just didn't have a simple idea how to test for reference counting/recycling :/ ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168475520 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collection> futures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set> futuresSet = new HashSet<>(); --- End diff -- Generally speaking you are right, but I think you missed removing finished futures from the `futuresSet`. Without this coping, removal could cause `wtf` moment for a user (`waitForAll` method removing something from the passed collection) and without removing, this code would be slow for larger number of futures. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168473812 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { --- End diff -- As we discussed, I'm not entirely sure. This "minor change" can be a significant overhead in case of many channels and large records. I don't want to risk increasing the scope of potential problems with this PR :( ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168472169 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Not thread safe class for producing {@link Buffer}. + * + * It reads data written by {@link BufferBuilder}. + * Although it is not thread safe and can be used only by one single thread, this thread can be different then the + * thread using/writing to {@link BufferBuilder}. Pattern here is simple: one thread writes data to + * {@link BufferBuilder} and there can be a different thread reading from it using {@link BufferConsumer}. + */ +@NotThreadSafe +public class BufferConsumer implements Closeable { --- End diff -- Yes, I know. Can you propose some different naming scheme? `BufferWriter` and `BufferBuilder`? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167860402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { } /** -* Writes the buffer to the {@link ResultPartitionWriter} and removes the -* buffer from the serializer state. +* Marks the current {@link BufferBuilder} as finished and clears the state for next one. * -* Needs to be synchronized on the serializer! +* @return true if some data were written */ - private void writeAndClearBuffer( - Buffer buffer, + private boolean tryFinishCurrentBufferBuilder( int targetChannel, RecordSerializer serializer) throws IOException { --- End diff -- This code no longer throws `IOException`. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167527566 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java --- @@ -87,41 +86,12 @@ public boolean isFullBuffer() { SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; --- End diff -- One remark from reading the code, I found it a bit surprising that a method that looks like a setter will case the write to continue. Maybe this is better called something like `continueWritingWithNextBufferBuilder` or split the setter from a `continueWrite` method? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167859598 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- This method does not longer throw `InterruptedException`. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167556014 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collection> futures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set> futuresSet = new HashSet<>(); + for (Future future : futures) { --- End diff -- Could be replaced with `addAll()` or even the constructor taking collection. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167556221 --- Diff: flink-core/src/main/java/org/apache/flink/util/FutureUtil.java --- @@ -45,4 +52,36 @@ private FutureUtil() { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collection> futures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set> futuresSet = new HashSet<>(); --- End diff -- I think for all purposes, we do not need a set to deduplicate. If a future is contained multiple times and already finished, waiting for it again is basically a NOP. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167879211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -220,6 +273,19 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO } } + private void registerAvailableReader(NetworkSequenceViewReader reader) { + availableReaders.add(reader); + reader.setRegisteredAsAvailable(true); + } + + private NetworkSequenceViewReader poolAvailableReader() { --- End diff -- This should probably be `pollAvailableReader()` ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167588455 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- I think this method does not truly require a return value. The return value is only used in one test, and I found it confusing that it is first closed and then returned. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167520277 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Not thread safe class for producing {@link Buffer}. + * + * It reads data written by {@link BufferBuilder}. + * Although it is not thread safe and can be used only by one single thread, this thread can be different then the + * thread using/writing to {@link BufferBuilder}. Pattern here is simple: one thread writes data to + * {@link BufferBuilder} and there can be a different thread reading from it using {@link BufferConsumer}. + */ +@NotThreadSafe +public class BufferConsumer implements Closeable { --- End diff -- Just a thought about names: this is called `BufferConsumer`, but it does not "consume" buffers. It is coordinating the production of read slices from a shared buffer. `BufferBuilder` makes more sense then this. Even worse, this class has a `build() : Buffer` method :-(. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167618791 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -138,6 +142,12 @@ /** Channels, which notified this input gate about available data. */ private final ArrayDeque inputChannelsWithData = new ArrayDeque<>(); + /** +* Field guaranteeing uniqueness for inputChannelsWithData queue. Both of those fields should be unified +* onto one. +*/ + private final Set enqueuedInputChannelsWithData = new HashSet<>(); --- End diff -- I wonder if this should not better be a `BitSet`? Do we typically expect very few enqueued channels with data from a large set of channels? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167546706 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { --- End diff -- I wonder if this loop could not be simplified to ``` while (!result.isFullRecord()) { tryFinishCurrentBufferBuilder(targetChannel, serializer); BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.setNextBufferBuilder(bufferBuilder); } ``` This would introduce a minor change in behaviour in cases where the end of the record falls exactly to the end of a buffer. With the change, the buffer is only finished by the next record and not on the spot. However this should not be a problem because this outcome is what usually should happen for almost every record beside those corner cases and thus the code should already handle them well. With this change, `tryFinishCurrentBufferBuilder` also does not longer require a return value. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167622468 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java --- @@ -72,7 +73,19 @@ void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** +* Blocking call waiting for next {@link BufferOrEvent}. +* +* @return {@code Optional.empty()} if {@link #isFinished()} returns true. +*/ + Optional getNextBufferOrEvent() throws IOException, InterruptedException; --- End diff -- From the description and also to better contrast against `pollNextBufferOrEvent ()`, it almost feels like this method should always return a `BufferOrEvent` and rather throw an exception if `isFinished()`. This seems to be how the empty optional is translated anyways, see AbstractRecordReader which reacts with `IllegalStateException`. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167857194 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { --- End diff -- I think you can remove the `throws IOException`, and after that also on the public `add(...)`. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167621192 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java --- @@ -60,12 +60,20 @@ void tagAsEvent(); /** -* Returns the underlying memory segment. +* Returns the underlying memory segment. This method is dangerous since it ignores read only protections and omits +* slices. Use it only along the {@link #getMemorySegmentOffset()}. * * @return the memory segment backing this buffer */ + @Deprecated MemorySegment getMemorySegment(); + /** +* @return the offset where this (potential slice) {@link Buffer}'s data start in the underlying memory segment. +*/ + @Deprecated --- End diff -- Same here. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167621132 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java --- @@ -60,12 +60,20 @@ void tagAsEvent(); /** -* Returns the underlying memory segment. +* Returns the underlying memory segment. This method is dangerous since it ignores read only protections and omits +* slices. Use it only along the {@link #getMemorySegmentOffset()}. * * @return the memory segment backing this buffer */ + @Deprecated --- End diff -- You should name the proper replacement for this deprecated method in the comment, or say that it will be eventually removed without replacement. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167898515 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link ResultPartitionWriter} that collects output on the List. + */ +@ThreadSafe +public abstract class AbstractCollectingResultPartitionWriter implements ResultPartitionWriter { + private final BufferProvider bufferProvider; + private final ArrayDeque bufferConsumers = new ArrayDeque<>(); + + public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) { + this.bufferProvider = checkNotNull(bufferProvider); + } + + @Override + public synchronized BufferProvider getBufferProvider() { + return bufferProvider; --- End diff -- What does this `synchronize` help? The field is `final`, so I would assume this change is not required. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167594824 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- I suggest to just make a normal for-each iteration to close all, followed by a clear. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167528361 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { } /** -* Writes the buffer to the {@link ResultPartitionWriter} and removes the -* buffer from the serializer state. +* Marks the current {@link BufferBuilder} as finished and clears the state for next one. * -* Needs to be synchronized on the serializer! +* @return true if some data were written */ - private void writeAndClearBuffer( - Buffer buffer, + private boolean tryFinishCurrentBufferBuilder( int targetChannel, RecordSerializer serializer) throws IOException { - try { - targetPartition.writeBuffer(buffer, targetChannel); - } - finally { - serializer.clearCurrentBuffer(); + if (!bufferBuilders[targetChannel].isPresent()) { + return false; } + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); + + numBytesOut.inc(bufferBuilder.getWrittenBytes()); + bufferBuilder.finish(); --- End diff -- You could combine this into `numBytesOut.inc(bufferBuilder.finish())` or maybe `finish()` should not need to have a return value? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167896900 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link ResultPartitionWriter} that collects output on the List. + */ +@ThreadSafe +public abstract class AbstractCollectingResultPartitionWriter implements ResultPartitionWriter { + private final BufferProvider bufferProvider; + private final ArrayDeque bufferConsumers = new ArrayDeque<>(); + + public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) { + this.bufferProvider = checkNotNull(bufferProvider); + } + + @Override + public synchronized BufferProvider getBufferProvider() { + return bufferProvider; + } + + @Override + public synchronized ResultPartitionID getPartitionId() { + return new ResultPartitionID(); --- End diff -- What is the intended effect of having this `synchronized`, looks like it does nothing? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167604523 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -55,69 +51,56 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) throws IOException { - checkNotNull(buffer); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + @Override + public void flush() { synchronized (buffers) { - if (isFinished || isReleased) { - buffer.recycleBuffer(); - return false; + if (readView != null) { + readView.notifyDataAvailable(); } - - // Add the buffer and update the stats - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); } - - return true; } @Override public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; + private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { + checkNotNull(bufferConsumer); synchronized (buffers) { if (isFinished || isReleased) { - return; + bufferConsumer.close(); + return false; } - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); + // Add the bufferConsumer and update the stats + buffers.add(bufferConsumer); + updateStatistics(bufferConsumer); + increaseBuffersInBacklog(bufferConsumer); - isFinished = true; + if (finish) { + isFinished = true; + notifyDataAvailable(); --- End diff -- I noticed that this, introduces a subtle change: unlike before, the notification to the listeners now happens under the lock of `buffers`. Just want to double-check that this will not have negative side-effects for performance? Did this fix any correctness problems? ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167596726 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -131,9 +114,9 @@ public void release() { } // Release all available buffers - Buffer buffer; - while ((buffer = buffers.poll()) != null) { - buffer.recycleBuffer(); + BufferConsumer bufferConsumer; + while ((bufferConsumer = buffers.poll()) != null) { --- End diff -- Maybe this part of the `release()`method should also go into the superclass, which usually manages `buffers`. The method could also have an optional hook for additional cleanups inside the `synchronized` block that subclasses can use if needed. ---
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167883063 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - 0); - - if (isEndOfPartitionEvent(next.buffer())) { - reader.notifySubpartitionC
[GitHub] flink pull request #5423: [FLINK-8581] Improve performance for low latency n...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167818220 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- Maybe we can add a comment explaining why this can happen now, i.e. mentioning about the output flusher. ---