[FLINK-8591][runtime] Pass unfinished bufferConsumers to subpartitions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b1e127f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b1e127f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b1e127f Branch: refs/heads/master Commit: 5b1e127f7b3acd8f82893dda394fbcb7fe93d20d Parents: 98bd689 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Wed Jan 24 14:43:23 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:41 2018 +0100 ---------------------------------------------------------------------- .../serialization/SpanningRecordSerializer.java | 3 - .../io/network/api/writer/RecordWriter.java | 96 +++++++-------- .../api/writer/ResultPartitionWriter.java | 8 ++ .../CreditBasedSequenceNumberingViewReader.java | 38 +++--- .../io/network/netty/PartitionRequestQueue.java | 79 ++++++------ .../netty/SequenceNumberingViewReader.java | 31 ++--- .../partition/BufferAvailabilityListener.java | 6 +- .../partition/PipelinedSubpartition.java | 84 ++++++++++--- .../partition/PipelinedSubpartitionView.java | 4 +- .../io/network/partition/ResultPartition.java | 7 ++ .../network/partition/ResultSubpartition.java | 14 ++- .../partition/ResultSubpartitionView.java | 2 +- .../partition/SpillableSubpartition.java | 11 ++ .../partition/SpillableSubpartitionView.java | 14 ++- .../partition/SpilledSubpartitionView.java | 8 +- .../partition/consumer/InputChannel.java | 5 +- .../partition/consumer/LocalInputChannel.java | 42 +++---- .../partition/consumer/RemoteInputChannel.java | 5 +- .../partition/consumer/SingleInputGate.java | 44 +++---- .../partition/consumer/UnionInputGate.java | 21 ++++ .../partition/consumer/UnknownInputChannel.java | 3 +- .../operators/shipping/OutputCollector.java | 17 ++- ...AbstractCollectingResultPartitionWriter.java | 20 ++- .../io/network/api/writer/RecordWriterTest.java | 12 +- .../network/buffer/BufferBuilderTestUtils.java | 4 + .../netty/CancelPartitionRequestTest.java | 9 +- .../netty/PartitionRequestQueueTest.java | 90 ++++++++++---- .../netty/ServerTransportErrorHandlingTest.java | 2 +- .../AwaitableBufferAvailablityListener.java | 47 +++++++ .../NoOpBufferAvailablityListener.java | 28 +++++ .../PartialConsumePipelinedResultTest.java | 2 +- .../partition/PipelinedSubpartitionTest.java | 123 ++++++++++++++++--- .../partition/SpillableSubpartitionTest.java | 47 ++----- .../network/partition/SubpartitionTestBase.java | 13 ++ .../partition/consumer/InputChannelTest.java | 5 +- .../IteratorWrappingTestSingleInputGate.java | 10 +- .../consumer/LocalInputChannelTest.java | 16 +-- .../partition/consumer/SingleInputGateTest.java | 2 +- .../partition/consumer/TestInputChannel.java | 14 ++- .../network/util/TestSubpartitionConsumer.java | 27 ++-- .../flink/streaming/api/graph/StreamConfig.java | 4 + .../runtime/io/RecordWriterOutput.java | 4 - .../runtime/io/StreamRecordWriter.java | 11 +- .../streaming/runtime/tasks/OperatorChain.java | 13 +- .../consumer/StreamTestSingleInputGate.java | 16 ++- .../runtime/io/StreamRecordWriterTest.java | 113 ----------------- 46 files changed, 672 insertions(+), 502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index e1d7fb1..ba8e659 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -148,9 +148,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public void clear() { - if (targetBuffer != null) { - targetBuffer.finish(); - } targetBuffer = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 801e6eb..51dfbde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -61,7 +61,7 @@ public class RecordWriter<T extends IOReadableWritable> { /** {@link RecordSerializer} per outgoing channel. */ private final RecordSerializer<T>[] serializers; - private final Optional<BufferConsumer>[] bufferConsumers; + private final Optional<BufferBuilder>[] bufferBuilders; private final Random rng = new XORShiftRandom(); @@ -84,10 +84,10 @@ public class RecordWriter<T extends IOReadableWritable> { * serializer. */ this.serializers = new SpanningRecordSerializer[numChannels]; - this.bufferConsumers = new Optional[numChannels]; + this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { serializers[i] = new SpanningRecordSerializer<T>(); - bufferConsumers[i] = Optional.empty(); + bufferBuilders[i] = Optional.empty(); } } @@ -117,28 +117,24 @@ public class RecordWriter<T extends IOReadableWritable> { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer<T> serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - if (tryWriteAndClearBuffer(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - checkState(!bufferConsumers[targetChannel].isPresent()); - bufferConsumers[targetChannel] = Optional.of(bufferBuilder.createBufferConsumer()); - result = serializer.setNextBufferBuilder(bufferBuilder); } - checkState(!serializer.hasSerializedData(), "All data should be written at once"); + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { @@ -146,34 +142,24 @@ public class RecordWriter<T extends IOReadableWritable> { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers[targetChannel]; - synchronized (serializer) { - tryWriteAndClearBuffer(targetChannel, serializer); + tryFinishCurrentBufferBuilder(targetChannel, serializer); - // retain the buffer so that it can be recycled by each channel of targetPartition - targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); - } + // retain the buffer so that it can be recycled by each channel of targetPartition + targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); } return eventBufferConsumer; } } - public void flush() throws IOException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - RecordSerializer<T> serializer = serializers[targetChannel]; - - synchronized (serializer) { - tryWriteAndClearBuffer(targetChannel, serializer); - } - } + public void flush() { + targetPartition.flush(); } public void clearBuffers() { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<?> serializer = serializers[targetChannel]; - synchronized (serializer) { - closeBufferConsumer(targetChannel); - serializer.clear(); - } + closeBufferConsumer(targetChannel); + serializer.clear(); } } @@ -185,33 +171,35 @@ public class RecordWriter<T extends IOReadableWritable> { } /** - * Tries to consume serialized data and (if data present) writes them to the {@link ResultPartitionWriter}. - * After writing it clean ups the state. - * - * <p><b>Needs to be synchronized on the serializer!</b> + * Marks the current {@link BufferBuilder} as finished and clears the state for next one. * * @return true if some data were written */ - private boolean tryWriteAndClearBuffer( - int targetChannel, - RecordSerializer<T> serializer) throws IOException { + private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer<T> serializer) { - if (!bufferConsumers[targetChannel].isPresent()) { + if (!bufferBuilders[targetChannel].isPresent()) { return false; } - BufferConsumer bufferConsumer = bufferConsumers[targetChannel].get(); - bufferConsumers[targetChannel] = Optional.empty(); + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); - numBytesOut.inc(bufferConsumer.getWrittenBytes()); + numBytesOut.inc(bufferBuilder.finish()); serializer.clear(); - targetPartition.addBufferConsumer(bufferConsumer, targetChannel); return true; } + private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { + checkState(!bufferBuilders[targetChannel].isPresent()); + BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking(); + bufferBuilders[targetChannel] = Optional.of(bufferBuilder); + targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel); + return bufferBuilder; + } + private void closeBufferConsumer(int targetChannel) { - if (bufferConsumers[targetChannel].isPresent()) { - bufferConsumers[targetChannel].get().close(); - bufferConsumers[targetChannel] = Optional.empty(); + if (bufferBuilders[targetChannel].isPresent()) { + bufferBuilders[targetChannel].get().finish(); + bufferBuilders[targetChannel] = Optional.empty(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index caefb52..02049d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -45,6 +45,14 @@ public interface ResultPartitionWriter { * * <p>This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing * it's resources. + * + * <p>To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one + * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}. */ void addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException; + + /** + * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers}. + */ + void flush(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java index 5ebf62d..d02b2bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java @@ -18,19 +18,19 @@ package org.apache.flink.runtime.io.network.netty; -import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper for the subpartition view used in the new network credit-based mode. @@ -44,7 +44,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen private final InputChannelID receiverId; - private final AtomicLong numBuffersAvailable = new AtomicLong(); + private final AtomicBoolean buffersAvailable = new AtomicBoolean(); private final PartitionRequestQueue requestQueue; @@ -118,7 +118,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable()! - return numBuffersAvailable.get() > 0 && + return buffersAvailable.get() && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); } @@ -131,11 +131,9 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen * * @param bufferAndBacklog * current buffer and backlog including information about the next buffer - * @param remaining - * remaining number of queued buffers, i.e. <tt>numBuffersAvailable.get()</tt> */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog, long remaining) { - return remaining > 0 && + private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + return bufferAndBacklog.isMoreAvailable() && (numCreditsAvailable > 0 || bufferAndBacklog.nextBufferIsEvent()); } @@ -155,27 +153,23 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen } @VisibleForTesting - long getNumBuffersAvailable() { - return numBuffersAvailable.get(); + boolean hasBuffersAvailable() { + return buffersAvailable.get(); } @Override public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { - long remaining = numBuffersAvailable.decrementAndGet(); + buffersAvailable.set(next.isMoreAvailable()); sequenceNumber++; - if (remaining < 0) { - throw new IllegalStateException("no buffer available"); - } - if (next.buffer().isBuffer() && --numCreditsAvailable < 0) { throw new IllegalStateException("no credit available"); } return new BufferAndAvailability( - next.buffer(), isAvailable(next, remaining), next.buffersInBacklog()); + next.buffer(), isAvailable(next), next.buffersInBacklog()); } else { return null; } @@ -202,11 +196,9 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen } @Override - public void notifyBuffersAvailable(long numBuffers) { - // if this request made the channel non-empty, notify the input gate - if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { - requestQueue.notifyReaderNonEmpty(this); - } + public void notifyDataAvailable() { + buffersAvailable.set(true); + requestQueue.notifyReaderNonEmpty(this); } @Override @@ -214,7 +206,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen return "CreditBasedSequenceNumberingViewReader{" + "requestLock=" + requestLock + ", receiverId=" + receiverId + - ", numBuffersAvailable=" + numBuffersAvailable.get() + + ", buffersAvailable=" + buffersAvailable.get() + ", sequenceNumber=" + sequenceNumber + ", numCreditsAvailable=" + numCreditsAvailable + ", isRegisteredAsAvailable=" + isRegisteredAsAvailable + http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index 4832442..8d43815 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -103,18 +103,17 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { * availability, so there is no race condition here. */ private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception { - if (!reader.isRegisteredAsAvailable() && reader.isAvailable()) { - // Queue an available reader for consumption. If the queue is empty, - // we try trigger the actual write. Otherwise this will be handled by - // the writeAndFlushNextMessageIfPossible calls. - boolean triggerWrite = availableReaders.isEmpty(); - availableReaders.add(reader); - - reader.setRegisteredAsAvailable(true); - - if (triggerWrite) { - writeAndFlushNextMessageIfPossible(ctx.channel()); - } + if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) { + return; + } + // Queue an available reader for consumption. If the queue is empty, + // we try trigger the actual write. Otherwise this will be handled by + // the writeAndFlushNextMessageIfPossible calls. + boolean triggerWrite = availableReaders.isEmpty(); + registerAvailableReader(reader); + + if (triggerWrite) { + writeAndFlushNextMessageIfPossible(ctx.channel()); } } @@ -183,13 +182,12 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { // Cancel the request for the input channel int size = availableReaders.size(); for (int i = 0; i < size; i++) { - NetworkSequenceViewReader reader = availableReaders.poll(); + NetworkSequenceViewReader reader = pollAvailableReader(); if (reader.getReceiverId().equals(toCancel)) { reader.releaseAllResources(); - reader.setRegisteredAsAvailable(false); markAsReleased(reader.getReceiverId()); } else { - availableReaders.add(reader); + registerAvailableReader(reader); } } @@ -216,7 +214,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { BufferAndAvailability next = null; try { while (true) { - NetworkSequenceViewReader reader = availableReaders.poll(); + NetworkSequenceViewReader reader = pollAvailableReader(); // No queue with available data. We allow this here, because // of the write callbacks that are executed after each write. @@ -226,32 +224,24 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { next = reader.getNextBuffer(); if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); + if (!reader.isReleased()) { + continue; + } + markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); + + ctx.writeAndFlush(msg); } } else { // This channel was now removed from the available reader queue. - // We re-add it into the queue if it is still available, otherwise we will - // notify the reader about the changed channel availability registration. + // We re-add it into the queue if it is still available if (next.moreAvailable()) { - availableReaders.add(reader); - } else { - reader.setRegisteredAsAvailable(false); + registerAvailableReader(reader); } BufferResponse msg = new BufferResponse( @@ -283,6 +273,19 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { } } + private void registerAvailableReader(NetworkSequenceViewReader reader) { + availableReaders.add(reader); + reader.setRegisteredAsAvailable(true); + } + + private NetworkSequenceViewReader pollAvailableReader() { + NetworkSequenceViewReader reader = availableReaders.poll(); + if (reader != null) { + reader.setRegisteredAsAvailable(false); + } + return reader; + } + private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException { return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class, getClass().getClassLoader()); @@ -301,7 +304,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { } private void handleException(Channel channel, Throwable cause) throws IOException { - LOG.debug("Encountered error while consuming partitions", cause); + LOG.error("Encountered error while consuming partitions", cause); fatalError = true; releaseAllResources(); http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 0ec5fcb..2d9635c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -19,17 +19,17 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper for the subpartition view used in the old network mode. @@ -43,7 +43,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network private final InputChannelID receiverId; - private final AtomicLong numBuffersAvailable = new AtomicLong(); + private final AtomicBoolean buffersAvailable = new AtomicBoolean(); private final PartitionRequestQueue requestQueue; @@ -51,6 +51,8 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network private int sequenceNumber = -1; + private boolean isRegisteredAvailable; + SequenceNumberingViewReader(InputChannelID receiverId, PartitionRequestQueue requestQueue) { this.receiverId = receiverId; this.requestQueue = requestQueue; @@ -84,16 +86,17 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network @Override public void setRegisteredAsAvailable(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; } @Override public boolean isRegisteredAsAvailable() { - return false; + return isRegisteredAvailable; } @Override public boolean isAvailable() { - return true; + return buffersAvailable.get(); } @Override @@ -110,14 +113,9 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { - long remaining = numBuffersAvailable.decrementAndGet(); + buffersAvailable.set(next.isMoreAvailable()); sequenceNumber++; - - if (remaining >= 0) { - return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog()); - } else { - throw new IllegalStateException("no buffer available"); - } + return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()); } else { return null; } @@ -144,11 +142,9 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network } @Override - public void notifyBuffersAvailable(long numBuffers) { - // if this request made the channel non-empty, notify the input gate - if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { - requestQueue.notifyReaderNonEmpty(this); - } + public void notifyDataAvailable() { + buffersAvailable.set(true); + requestQueue.notifyReaderNonEmpty(this); } @Override @@ -156,7 +152,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network return "SequenceNumberingViewReader{" + "requestLock=" + requestLock + ", receiverId=" + receiverId + - ", numBuffersAvailable=" + numBuffersAvailable.get() + ", sequenceNumber=" + sequenceNumber + '}'; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java index 114ef7c..e78f99a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java @@ -25,9 +25,7 @@ package org.apache.flink.runtime.io.network.partition; public interface BufferAvailabilityListener { /** - * Called whenever a new number of buffers becomes available. - * - * @param numBuffers The number of buffers that became available. + * Called whenever there might be new data available. */ - void notifyBuffersAvailable(long numBuffers); + void notifyDataAvailable(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2fa512a..dcaa360 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -63,6 +63,15 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override + public void flush() { + synchronized (buffers) { + if (readView != null) { + readView.notifyDataAvailable(); + } + } + } + + @Override public void finish() throws IOException { add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); LOG.debug("Finished {}.", this); @@ -84,10 +93,10 @@ class PipelinedSubpartition extends ResultSubpartition { if (finish) { isFinished = true; + notifyDataAvailable(); } - - if (readView != null) { - readView.notifyBuffersAvailable(1); + else { + maybeNotifyDataAvailable(); } } @@ -127,19 +136,42 @@ class PipelinedSubpartition extends ResultSubpartition { @Nullable BufferAndBacklog pollBuffer() { synchronized (buffers) { - BufferConsumer bufferConsumer = buffers.peek(); - if (bufferConsumer == null) { - return null; + Buffer buffer = null; + + while (!buffers.isEmpty()) { + BufferConsumer bufferConsumer = buffers.peek(); + + buffer = bufferConsumer.build(); + checkState(bufferConsumer.isFinished() || buffers.size() == 1, + "When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue."); + + if (bufferConsumer.isFinished()) { + buffers.pop().close(); + decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer()); + } + if (buffer.readableBytes() > 0) { + break; + } + buffer.recycleBuffer(); + buffer = null; + if (!bufferConsumer.isFinished()) { + break; + } } - Buffer buffer = bufferConsumer.build(); - if (bufferConsumer.isFinished()) { - buffers.pop().close(); - decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer()); + if (buffer == null) { + return null; } updateStatistics(buffer); - return new BufferAndBacklog(buffer, getBuffersInBacklog(), _nextBufferIsEvent()); + // Do not report last remaining buffer on buffers as available to read (assuming it's unfinished). + // It will be reported for reading either on flush or when the number of buffers in the queue + // will be 2 or more. + return new BufferAndBacklog( + buffer, + getNumberOfFinishedBuffers() > 0, + getBuffersInBacklog(), + _nextBufferIsEvent()); } } @@ -169,8 +201,6 @@ class PipelinedSubpartition extends ResultSubpartition { @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { - final int queueSize; - synchronized (buffers) { checkState(!isReleased); checkState(readView == null, @@ -179,12 +209,12 @@ class PipelinedSubpartition extends ResultSubpartition { LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId()); - queueSize = buffers.size(); readView = new PipelinedSubpartitionView(this, availabilityListener); + if (!buffers.isEmpty()) { + readView.notifyDataAvailable(); + } } - readView.notifyBuffersAvailable(queueSize); - return readView; } @@ -220,4 +250,26 @@ class PipelinedSubpartition extends ResultSubpartition { // since we do not synchronize, the size may actually be lower than 0! return Math.max(buffers.size(), 0); } + + private void maybeNotifyDataAvailable() { + // Notify only when we added first finished buffer. + if (getNumberOfFinishedBuffers() == 1) { + notifyDataAvailable(); + } + } + + private void notifyDataAvailable() { + if (readView != null) { + readView.notifyDataAvailable(); + } + } + + private int getNumberOfFinishedBuffers() { + if (buffers.size() == 1 && buffers.peekLast().isFinished()) { + return 1; + } + + // We assume that only last buffer is not finished. + return Math.max(0, buffers.size() - 1); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index 21abd04..c60a604 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -52,8 +52,8 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { } @Override - public void notifyBuffersAvailable(long numBuffers) { - availabilityListener.notifyBuffersAvailable(numBuffers); + public void notifyDataAvailable() { + availabilityListener.notifyDataAvailable(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 9be261e..25a076b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -257,6 +257,13 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { } } + @Override + public void flush() { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.flush(); + } + } + /** * Finishes the result partition. * http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index 572cde7..adc0ed3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -93,6 +93,9 @@ public abstract class ResultSubpartition { * <p>The request may be executed synchronously, or asynchronously, depending on the * implementation. * + * <p><strong>IMPORTANT:</strong> Before adding new {@link BufferConsumer} previously added must be in finished + * state. Because of the performance reasons, this is only enforced during the data reading. + * * @param bufferConsumer * the buffer to add (transferring ownership to this writer) * @return true if operation succeeded and bufferConsumer was enqueued for consumption. @@ -101,6 +104,8 @@ public abstract class ResultSubpartition { */ abstract public boolean add(BufferConsumer bufferConsumer) throws IOException; + abstract public void flush(); + abstract public void finish() throws IOException; abstract public void release() throws IOException; @@ -170,12 +175,14 @@ public abstract class ResultSubpartition { public static final class BufferAndBacklog { private final Buffer buffer; + private final boolean isMoreAvailable; private final int buffersInBacklog; private final boolean nextBufferIsEvent; - public BufferAndBacklog(Buffer buffer, int buffersInBacklog, boolean nextBufferIsEvent) { + public BufferAndBacklog(Buffer buffer, boolean isMoreAvailable, int buffersInBacklog, boolean nextBufferIsEvent) { this.buffer = checkNotNull(buffer); this.buffersInBacklog = buffersInBacklog; + this.isMoreAvailable = isMoreAvailable; this.nextBufferIsEvent = nextBufferIsEvent; } @@ -183,10 +190,15 @@ public abstract class ResultSubpartition { return buffer; } + public boolean isMoreAvailable() { + return isMoreAvailable; + } + public int buffersInBacklog() { return buffersInBacklog; } + public boolean nextBufferIsEvent() { return nextBufferIsEvent; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index 9b0344e..41fbb0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -43,7 +43,7 @@ public interface ResultSubpartitionView { @Nullable BufferAndBacklog getNextBuffer() throws IOException, InterruptedException; - void notifyBuffersAvailable(long buffers); + void notifyDataAvailable(); void releaseAllResources() throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 4b9f59f..8758b34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -115,11 +115,22 @@ class SpillableSubpartition extends ResultSubpartition { } @Override + public void flush() { + synchronized (buffers) { + if (readView != null) { + readView.notifyDataAvailable(); + } + } + } + + @Override public synchronized void finish() throws IOException { synchronized (buffers) { if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE))) { isFinished = true; } + + flush(); } // If we are spilling/have spilled, wait for the writer to finish http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 6c173a3..789b3d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -89,7 +89,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { } if (nextBuffer != null) { - listener.notifyBuffersAvailable(1); + listener.notifyDataAvailable(); } } @@ -143,20 +143,24 @@ class SpillableSubpartitionView implements ResultSubpartitionView { Buffer current = null; boolean nextBufferIsEvent = false; int newBacklog = 0; // this is always correct if current is non-null! + boolean isMoreAvailable = false; synchronized (buffers) { if (isReleased.get()) { return null; } else if (nextBuffer != null) { current = nextBuffer.build(); + if (nextBuffer.isFinished()) { newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer()); nextBuffer.close(); nextBuffer = buffers.poll(); } + isMoreAvailable = buffers.size() > 0; if (nextBuffer != null) { - listener.notifyBuffersAvailable(1); + isMoreAvailable = true; + listener.notifyDataAvailable(); nextBufferIsEvent = !nextBuffer.isBuffer(); } @@ -164,7 +168,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { // if we are spilled (but still process a non-spilled nextBuffer), we don't know the // state of nextBufferIsEvent... if (spilledView == null) { - return new BufferAndBacklog(current, newBacklog, nextBufferIsEvent); + return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent); } } } // else: spilled @@ -172,7 +176,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { SpilledSubpartitionView spilled = spilledView; if (spilled != null) { if (current != null) { - return new BufferAndBacklog(current, newBacklog, spilled.nextBufferIsEvent()); + return new BufferAndBacklog(current, isMoreAvailable, newBacklog, spilled.nextBufferIsEvent()); } else { return spilled.getNextBuffer(); } @@ -182,7 +186,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { } @Override - public void notifyBuffersAvailable(long buffers) { + public void notifyDataAvailable() { // We do the availability listener notification one by one } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index d1917e6..4c5cd2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -105,7 +105,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis // Otherwise, we notify only when the spill writer callback happens. if (!spillWriter.registerAllRequestsProcessedListener(this)) { isSpillInProgress = false; - availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers); + availabilityListener.notifyDataAvailable(); LOG.debug("No spilling in progress. Notified about {} available buffers.", numberOfSpilledBuffers); } else { LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", numberOfSpilledBuffers); @@ -120,7 +120,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis @Override public void onNotification() { isSpillInProgress = false; - availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers); + availabilityListener.notifyDataAvailable(); LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers); } @@ -148,7 +148,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis } int newBacklog = parent.decreaseBuffersInBacklog(current); - return new BufferAndBacklog(current, newBacklog, nextBufferIsEvent); + return new BufferAndBacklog(current, newBacklog > 0, newBacklog, nextBufferIsEvent); } @Nullable @@ -166,7 +166,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis } @Override - public void notifyBuffersAvailable(long buffers) { + public void notifyDataAvailable() { // We do the availability listener notification either directly on // construction of this view (when everything has been spilled) or // as soon as spilling is done and we are notified about it in the http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 7b7edf7..3ce5866 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; @@ -134,9 +135,9 @@ public abstract class InputChannel { abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException; /** - * Returns the next buffer from the consumed subpartition. + * Returns the next buffer from the consumed subpartition or {@code Optional.empty()} if there is no data to return. */ - abstract BufferAndAvailability getNextBuffer() throws IOException, InterruptedException; + abstract Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException; // ------------------------------------------------------------------------ // Task events http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 8505666..f9c75ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -23,19 +23,19 @@ import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; -import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Optional; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -57,9 +57,6 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit /** Task event dispatcher for backwards events. */ private final TaskEventDispatcher taskEventDispatcher; - /** Number of available buffers used to keep track of non-empty gate notifications. */ - private final AtomicLong numBuffersAvailable; - /** The consumed subpartition */ private volatile ResultSubpartitionView subpartitionView; @@ -91,7 +88,6 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); - this.numBuffersAvailable = new AtomicLong(); } // ------------------------------------------------------------------------ @@ -166,11 +162,19 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit } @Override - BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { + Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException { checkError(); ResultSubpartitionView subpartitionView = this.subpartitionView; if (subpartitionView == null) { + // There is a possible race condition between writing a EndOfPartitionEvent (1) and flushing (3) the Local + // channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush notification (4). When + // they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue LocalInputChannel after (or + // during) it was released during reading the EndOfPartitionEvent (2). + if (isReleased) { + return Optional.empty(); + } + // this can happen if the request for the partition was triggered asynchronously // by the time trigger // would be good to avoid that, by guaranteeing that the requestPartition() and @@ -185,31 +189,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit if (subpartitionView.isReleased()) { throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released."); } else { - // This means there is a bug in the buffer availability - // notifications. - throw new IllegalStateException("Consumed partition has no buffers available. " + - "Number of received buffer notifications is " + numBuffersAvailable + "."); + return Optional.empty(); } } - long remaining = numBuffersAvailable.decrementAndGet(); - - if (remaining >= 0) { - numBytesIn.inc(next.buffer().getSizeUnsafe()); - return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog()); - } else if (subpartitionView.isReleased()) { - throw new ProducerFailedException(subpartitionView.getFailureCause()); - } else { - throw new IllegalStateException("No buffer available and producer partition not released."); - } + numBytesIn.inc(next.buffer().getSizeUnsafe()); + return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog())); } @Override - public void notifyBuffersAvailable(long numBuffers) { - // if this request made the channel non-empty, notify the input gate - if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { - notifyChannelNonEmpty(); - } + public void notifyDataAvailable() { + notifyChannelNonEmpty(); } private ResultSubpartitionView checkAndWaitForSubpartitionView() { http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 8a8c7f5..8174359 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -42,6 +42,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -183,7 +184,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } @Override - BufferAndAvailability getNextBuffer() throws IOException { + Optional<BufferAndAvailability> getNextBuffer() throws IOException { checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); @@ -198,7 +199,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } numBytesIn.inc(next.getSizeUnsafe()); - return new BufferAndAvailability(next, remaining > 0, getSenderBacklog()); + return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog())); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 337b3c2..04b8ee6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -509,39 +509,39 @@ public class SingleInputGate implements InputGate { InputChannel currentChannel; boolean moreAvailable; - synchronized (inputChannelsWithData) { - while (inputChannelsWithData.size() == 0) { - if (isReleased) { - throw new IllegalStateException("Released"); - } + Optional<BufferAndAvailability> result = Optional.empty(); - if (blocking) { - inputChannelsWithData.wait(); - } - else { - return Optional.empty(); + do { + synchronized (inputChannelsWithData) { + while (inputChannelsWithData.size() == 0) { + if (isReleased) { + throw new IllegalStateException("Released"); + } + + if (blocking) { + inputChannelsWithData.wait(); + } + else { + return Optional.empty(); + } } + + currentChannel = inputChannelsWithData.remove(); + enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); + moreAvailable = inputChannelsWithData.size() > 0; } - currentChannel = inputChannelsWithData.remove(); - enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); - moreAvailable = inputChannelsWithData.size() > 0; - } + result = currentChannel.getNextBuffer(); + } while (!result.isPresent()); - final BufferAndAvailability result = currentChannel.getNextBuffer(); - // Sanity check that notifications only happen when data is available - if (result == null) { - throw new IllegalStateException("Bug in input gate/channel logic: input gate got " + - "notified by channel about available data, but none was available."); - } // this channel was now removed from the non-empty channels queue // we re-add it in case it has more data, because in that case no "non-empty" notification // will come for that channel - if (result.moreAvailable()) { + if (result.get().moreAvailable()) { queueChannel(currentChannel); } - final Buffer buffer = result.buffer(); + final Buffer buffer = result.get().buffer(); if (buffer.isBuffer()) { return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable)); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 14c04bc..5a547ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -26,12 +26,14 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayDeque; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Input gate wrapper to union the input from multiple input gates. @@ -71,6 +73,11 @@ public class UnionInputGate implements InputGate, InputGateListener { /** Gates, which notified this input gate about available data. */ private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>(); + /** + * Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}. + */ + private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>(); + /** The total number of input channels across all unioned input gates. */ private final int totalNumberOfInputChannels; @@ -163,12 +170,20 @@ public class UnionInputGate implements InputGate, InputGateListener { && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); @@ -190,6 +205,7 @@ public class UnionInputGate implements InputGate, InputGateListener { inputGatesWithData.wait(); } inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); } // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. @@ -248,9 +264,14 @@ public class UnionInputGate implements InputGate, InputGateListener { int availableInputGates; synchronized (inputGatesWithData) { + if (enqueuedInputGatesWithData.contains(inputGate)) { + return; + } + availableInputGates = inputGatesWithData.size(); inputGatesWithData.add(inputGate); + enqueuedInputGatesWithData.add(inputGate); if (availableInputGates == 0) { inputGatesWithData.notifyAll(); http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index d887ab6..1101f66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import java.io.IOException; +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,7 +77,7 @@ class UnknownInputChannel extends InputChannel { } @Override - public BufferAndAvailability getNextBuffer() throws IOException { + public Optional<BufferAndAvailability> getNextBuffer() throws IOException { // Nothing to do here throw new UnsupportedOperationException("Cannot retrieve a buffer from an UnknownInputChannel"); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java index 3526e96..382ae39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java @@ -18,16 +18,16 @@ package org.apache.flink.runtime.operators.shipping; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.util.Collector; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * The OutputCollector collects records, and emits them to the {@link RecordWriter}s. * The OutputCollector tracks to which writers a deep-copy must be given and which not. @@ -81,11 +81,8 @@ public class OutputCollector<T> implements Collector<T> { @Override public void close() { for (RecordWriter<?> writer : writers) { - try { - writer.flush(); - } catch (IOException e) { - throw new RuntimeException(e.getMessage(), e); - } + writer.clearBuffers(); + writer.flush(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java index 5a7d20a..b2171c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import javax.annotation.concurrent.ThreadSafe; + import java.io.IOException; import java.util.ArrayDeque; @@ -32,6 +34,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * {@link ResultPartitionWriter} that collects output on the List. */ +@ThreadSafe public abstract class AbstractCollectingResultPartitionWriter implements ResultPartitionWriter { private final BufferProvider bufferProvider; private final ArrayDeque<BufferConsumer> bufferConsumers = new ArrayDeque<>(); @@ -61,13 +64,15 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP } @Override - public void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { + public synchronized void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { checkState(targetChannel < getNumberOfSubpartitions()); - bufferConsumers.add(bufferConsumer); + processBufferConsumers(); + } + private void processBufferConsumers() throws IOException { while (!bufferConsumers.isEmpty()) { - bufferConsumer = bufferConsumers.peek(); + BufferConsumer bufferConsumer = bufferConsumers.peek(); Buffer buffer = bufferConsumer.build(); try { deserializeBuffer(buffer); @@ -82,5 +87,14 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP } } + @Override + public synchronized void flush() { + try { + processBufferConsumers(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + protected abstract void deserializeBuffer(Buffer buffer) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 95d6655..ed32454 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -62,10 +62,7 @@ import java.util.concurrent.Future; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -180,7 +177,6 @@ public class RecordWriterTest { // Fill a buffer, but don't write it out. recordWriter.emit(new IntValue(0)); - verify(partitionWriter, never()).addBufferConsumer(any(BufferConsumer.class), anyInt()); // Clear all buffers. recordWriter.clearBuffers(); @@ -428,6 +424,10 @@ public class RecordWriterTest { public void addBufferConsumer(BufferConsumer buffer, int targetChannel) throws IOException { queues[targetChannel].add(buffer); } + + @Override + public void flush() { + } } private static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { @@ -477,6 +477,10 @@ public class RecordWriterTest { public void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { bufferConsumer.close(); } + + @Override + public void flush() { + } } private static class ByteArrayIO implements IOReadableWritable { http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java index c6b8599..ead42df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java @@ -30,6 +30,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class BufferBuilderTestUtils { public static final int BUFFER_SIZE = 32 * 1024; + public static BufferBuilder createBufferBuilder() { + return createBufferBuilder(BUFFER_SIZE); + } + public static BufferBuilder createBufferBuilder(int size) { return createFilledBufferBuilder(size, 0); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 4c4939b..56abff1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest; import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; @@ -88,7 +87,7 @@ public class CancelPartitionRequestTest { @Override public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2]; - listener.notifyBuffersAvailable(Long.MAX_VALUE); + listener.notifyDataAvailable(); return view; } }); @@ -139,7 +138,7 @@ public class CancelPartitionRequestTest { @Override public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2]; - listener.notifyBuffersAvailable(Long.MAX_VALUE); + listener.notifyDataAvailable(); return view; } }); @@ -194,11 +193,11 @@ public class CancelPartitionRequestTest { public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { Buffer buffer = bufferProvider.requestBufferBlocking(); buffer.setSize(buffer.getMaxCapacity()); // fake some data - return new BufferAndBacklog(buffer, 0, false); + return new BufferAndBacklog(buffer, true, 0, false); } @Override - public void notifyBuffersAvailable(long buffers) { + public void notifyDataAvailable() { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java index 69a0e11..16418ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java @@ -36,6 +36,7 @@ import org.junit.Test; import javax.annotation.Nullable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; @@ -52,6 +53,43 @@ import static org.junit.Assert.assertTrue; */ public class PartitionRequestQueueTest { + /** + * In case of enqueuing an empty reader and a reader that actually has some buffers when channel is not writable, + * on channelWritability change event should result in reading all of the messages. + */ + @Test + public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception { + final int buffersToWrite = 5; + PartitionRequestQueue queue = new PartitionRequestQueue(); + EmbeddedChannel channel = new EmbeddedChannel(queue); + + CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0, 0), 10, queue); + CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1, 1), 10, queue); + + reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new NotReleasedResultSubpartitionView(), new ResultPartitionID(), 0); + reader1.notifyDataAvailable(); + assertTrue(reader1.isAvailable()); + assertFalse(reader1.isRegisteredAsAvailable()); + + channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false); + assertFalse(channel.isWritable()); + + reader1.notifyDataAvailable(); + channel.runPendingTasks(); + + reader2.notifyDataAvailable(); + reader2.requestSubpartitionView((partitionId, index, availabilityListener) -> new DefaultBufferResultSubpartitionView(buffersToWrite), new ResultPartitionID(), 0); + assertTrue(reader2.isAvailable()); + assertFalse(reader2.isRegisteredAsAvailable()); + + reader2.notifyDataAvailable(); + + // changing a channel writability should result in draining both reader1 and reader2 + channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true); + channel.runPendingTasks(); + assertEquals(buffersToWrite, channel.outboundMessages().size()); + } + @Test public void testProducerFailedException() throws Exception { PartitionRequestQueue queue = new PartitionRequestQueue(); @@ -66,7 +104,7 @@ public class PartitionRequestQueueTest { CreditBasedSequenceNumberingViewReader seqView = new CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue); seqView.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0); // Add available buffer to trigger enqueue the erroneous view - seqView.notifyBuffersAvailable(1); + seqView.notifyDataAvailable(); ch.runPendingTasks(); @@ -84,7 +122,7 @@ public class PartitionRequestQueueTest { */ @Test public void testDefaultBufferWriting() throws Exception { - testBufferWriting(new DefaultBufferResultSubpartitionView(2)); + testBufferWriting(new DefaultBufferResultSubpartitionView(1)); } /** @@ -92,7 +130,7 @@ public class PartitionRequestQueueTest { */ @Test public void testReadOnlyBufferWriting() throws Exception { - testBufferWriting(new ReadOnlyBufferResultSubpartitionView(2)); + testBufferWriting(new ReadOnlyBufferResultSubpartitionView(1)); } private void testBufferWriting(ResultSubpartitionView view) throws IOException { @@ -108,7 +146,7 @@ public class PartitionRequestQueueTest { reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0); // notify about buffer availability and encode one buffer - reader.notifyBuffersAvailable(1); + reader.notifyDataAvailable(); channel.runPendingTasks(); @@ -124,37 +162,45 @@ public class PartitionRequestQueueTest { private static class DefaultBufferResultSubpartitionView extends NoOpResultSubpartitionView { /** Number of buffer in the backlog to report with every {@link #getNextBuffer()} call. */ - final int buffersInBacklog; + private final AtomicInteger buffersInBacklog; private DefaultBufferResultSubpartitionView(int buffersInBacklog) { - this.buffersInBacklog = buffersInBacklog; + this.buffersInBacklog = new AtomicInteger(buffersInBacklog);; } @Nullable @Override public BufferAndBacklog getNextBuffer() { + int buffers = buffersInBacklog.decrementAndGet(); return new BufferAndBacklog( TestBufferFactory.createBuffer(10), - buffersInBacklog, + buffers > 0, + buffers, false); } } - private static class ReadOnlyBufferResultSubpartitionView extends NoOpResultSubpartitionView { - /** Number of buffer in the backlog to report with every {@link #getNextBuffer()} call. */ - final int buffersInBacklog; - + private static class ReadOnlyBufferResultSubpartitionView extends DefaultBufferResultSubpartitionView { private ReadOnlyBufferResultSubpartitionView(int buffersInBacklog) { - this.buffersInBacklog = buffersInBacklog; + super(buffersInBacklog); } @Nullable @Override public BufferAndBacklog getNextBuffer() { + BufferAndBacklog nextBuffer = super.getNextBuffer(); return new BufferAndBacklog( - TestBufferFactory.createBuffer(10).readOnlySlice(), - buffersInBacklog, - false); + nextBuffer.buffer().readOnlySlice(), + nextBuffer.isMoreAvailable(), + nextBuffer.buffersInBacklog(), + nextBuffer.nextBufferIsEvent()); + } + } + + private static class NotReleasedResultSubpartitionView extends NoOpResultSubpartitionView { + @Override + public boolean isReleased() { + return false; } } @@ -195,7 +241,7 @@ public class PartitionRequestQueueTest { assertNull(channel.readOutbound()); // Notify an available event buffer to trigger enqueue the reader - reader.notifyBuffersAvailable(1); + reader.notifyDataAvailable(); channel.runPendingTasks(); @@ -226,7 +272,7 @@ public class PartitionRequestQueueTest { @Test public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { // setup - final ResultSubpartitionView view = new DefaultBufferResultSubpartitionView(2); + final ResultSubpartitionView view = new DefaultBufferResultSubpartitionView(10); ResultPartitionProvider partitionProvider = (partitionId, index, availabilityListener) -> view; @@ -246,7 +292,7 @@ public class PartitionRequestQueueTest { // Notify available buffers to trigger enqueue the reader final int notifyNumBuffers = 5; for (int i = 0; i < notifyNumBuffers; i++) { - reader.notifyBuffersAvailable(1); + reader.notifyDataAvailable(); } channel.runPendingTasks(); @@ -254,7 +300,7 @@ public class PartitionRequestQueueTest { // the reader is not enqueued in the pipeline because no credits are available // -> it should still have the same number of pending buffers assertEquals(0, queue.getAvailableReaders().size()); - assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable()); + assertTrue(reader.hasBuffersAvailable()); assertFalse(reader.isRegisteredAsAvailable()); assertEquals(0, reader.getNumCreditsAvailable()); @@ -269,7 +315,7 @@ public class PartitionRequestQueueTest { assertTrue(reader.isRegisteredAsAvailable()); assertThat(queue.getAvailableReaders(), contains(reader)); // contains only (this) one! assertEquals(i, reader.getNumCreditsAvailable()); - assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable()); + assertTrue(reader.hasBuffersAvailable()); } // Flush the buffer to make the channel writable again and see the final results @@ -278,7 +324,7 @@ public class PartitionRequestQueueTest { assertEquals(0, queue.getAvailableReaders().size()); assertEquals(0, reader.getNumCreditsAvailable()); - assertEquals(notifyNumBuffers - notifyNumCredits, reader.getNumBuffersAvailable()); + assertTrue(reader.hasBuffersAvailable()); assertFalse(reader.isRegisteredAsAvailable()); for (int i = 1; i <= notifyNumCredits; i++) { assertThat(channel.readOutbound(), instanceOf(NettyMessage.BufferResponse.class)); @@ -316,7 +362,7 @@ public class PartitionRequestQueueTest { } @Override - public void notifyBuffersAvailable(long buffers) { + public void notifyDataAvailable() { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 8646168..5360041 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -68,7 +68,7 @@ public class ServerTransportErrorHandlingTest { @Override public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2]; - listener.notifyBuffersAvailable(Long.MAX_VALUE); + listener.notifyDataAvailable(); return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java new file mode 100644 index 0000000..2b6b834 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +/** + * Test implementation of {@link BufferAvailabilityListener}. + */ +class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { + + private long numNotifications; + + @Override + public void notifyDataAvailable() { + ++numNotifications; + } + + public long getNumNotifications() { + return numNotifications; + } + + public void resetNotificationCounters() { + numNotifications = 0; + } + + void awaitNotifications(long awaitedNumNotifications, long timeoutMillis) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (numNotifications < awaitedNumNotifications && System.currentTimeMillis() < deadline) { + Thread.sleep(1); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java new file mode 100644 index 0000000..4162975 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +/** + * Test implementation of {@link BufferAvailabilityListener}. + */ +class NoOpBufferAvailablityListener implements BufferAvailabilityListener { + @Override + public void notifyDataAvailable() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 76e6f2c..ced1a33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -123,8 +123,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger { for (int i = 0; i < 8; i++) { final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking(); writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0); - Thread.sleep(50); + bufferBuilder.finish(); } } }