[hotfix][runtime] Deduplicate buffersInBacklog code in Pipelined and Spillable subtartitions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10d11d79 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10d11d79 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10d11d79 Branch: refs/heads/master Commit: 10d11d7991c18516d503dfcc82815d58fae01b46 Parents: 2214a24 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Jan 18 12:09:15 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:20 2018 +0100 ---------------------------------------------------------------------- .../partition/PipelinedSubpartition.java | 47 ++------------------ .../network/partition/ResultSubpartition.java | 46 ++++++++++++++++++- .../partition/SpillableSubpartition.java | 45 +------------------ 3 files changed, 51 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10d11d79/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 9c6197c..2f4fd6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -27,9 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; + import java.io.IOException; -import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -43,9 +41,6 @@ class PipelinedSubpartition extends ResultSubpartition { // ------------------------------------------------------------------------ - /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ - private final ArrayDeque<Buffer> buffers = new ArrayDeque<>(); - /** The read view to consume this subpartition. */ private PipelinedSubpartitionView readView; @@ -55,10 +50,6 @@ class PipelinedSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition. */ - @GuardedBy("buffers") - private int buffersInBacklog; - // ------------------------------------------------------------------------ PipelinedSubpartition(int index, ResultPartition parent) { @@ -141,10 +132,10 @@ class PipelinedSubpartition extends ResultSubpartition { BufferAndBacklog pollBuffer() { synchronized (buffers) { Buffer buffer = buffers.pollFirst(); - decreaseBuffersInBacklog(buffer); + decreaseBuffersInBacklogUnsafe(buffer); if (buffer != null) { - return new BufferAndBacklog(buffer, buffersInBacklog, _nextBufferIsEvent()); + return new BufferAndBacklog(buffer, getBuffersInBacklog(), _nextBufferIsEvent()); } else { return null; } @@ -176,36 +167,6 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override - @VisibleForTesting - public int getBuffersInBacklog() { - return buffersInBacklog; - } - - /** - * Decreases the number of non-event buffers by one after fetching a non-event - * buffer from this subpartition. - */ - private void decreaseBuffersInBacklog(Buffer buffer) { - assert Thread.holdsLock(buffers); - - if (buffer != null && buffer.isBuffer()) { - buffersInBacklog--; - } - } - - /** - * Increases the number of non-event buffers by one after adding a non-event - * buffer into this subpartition. - */ - private void increaseBuffersInBacklog(Buffer buffer) { - assert Thread.holdsLock(buffers); - - if (buffer != null && buffer.isBuffer()) { - buffersInBacklog++; - } - } - - @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { final int queueSize; @@ -250,7 +211,7 @@ class PipelinedSubpartition extends ResultSubpartition { return String.format( "PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]", - numBuffers, numBytes, buffersInBacklog, finished, hasReadView); + numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/10d11d79/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index e51f215..19447b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -21,7 +21,10 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.buffer.Buffer; +import javax.annotation.concurrent.GuardedBy; + import java.io.IOException; +import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -36,6 +39,13 @@ public abstract class ResultSubpartition { /** The parent partition this subpartition belongs to. */ protected final ResultPartition parent; + /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ + protected final ArrayDeque<Buffer> buffers = new ArrayDeque<>(); + + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private int buffersInBacklog; + // - Statistics ---------------------------------------------------------- /** The total number of buffers (both data and event buffers) */ @@ -104,7 +114,9 @@ public abstract class ResultSubpartition { * scenarios since it does not make any concurrency guarantees. */ @VisibleForTesting - abstract public int getBuffersInBacklog(); + public int getBuffersInBacklog() { + return buffersInBacklog; + } /** * Makes a best effort to get the current size of the queue. @@ -113,6 +125,38 @@ public abstract class ResultSubpartition { */ abstract public int unsynchronizedGetNumberOfQueuedBuffers(); + /** + * Decreases the number of non-event buffers by one after fetching a non-event + * buffer from this subpartition (for access by the subpartition views). + * + * @return backlog after the operation + */ + public int decreaseBuffersInBacklog(Buffer buffer) { + synchronized (buffers) { + return decreaseBuffersInBacklogUnsafe(buffer); + } + } + + protected int decreaseBuffersInBacklogUnsafe(Buffer buffer) { + assert Thread.holdsLock(buffers); + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + return buffersInBacklog; + } + + /** + * Increases the number of non-event buffers by one after adding a non-event + * buffer into this subpartition. + */ + protected void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } + // ------------------------------------------------------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/10d11d79/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index e57e30a..dc0d0d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -26,10 +25,10 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayDeque; @@ -70,9 +69,6 @@ class SpillableSubpartition extends ResultSubpartition { private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class); - /** Buffers are kept in this queue as long as we weren't ask to release any. */ - private final ArrayDeque<Buffer> buffers = new ArrayDeque<>(); - /** The I/O manager used for spilling buffers to disk. */ private final IOManager ioManager; @@ -85,10 +81,6 @@ class SpillableSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** The number of non-event buffers currently in this subpartition */ - @GuardedBy("buffers") - private int buffersInBacklog; - /** The read view to consume this subpartition. */ private ResultSubpartitionView readView; @@ -264,39 +256,6 @@ class SpillableSubpartition extends ResultSubpartition { } @Override - @VisibleForTesting - public int getBuffersInBacklog() { - return buffersInBacklog; - } - - /** - * Decreases the number of non-event buffers by one after fetching a non-event - * buffer from this subpartition (for access by the subpartition views). - * - * @return backlog after the operation - */ - public int decreaseBuffersInBacklog(Buffer buffer) { - synchronized (buffers) { - if (buffer != null && buffer.isBuffer()) { - buffersInBacklog--; - } - return buffersInBacklog; - } - } - - /** - * Increases the number of non-event buffers by one after adding a non-event - * buffer into this subpartition. - */ - private void increaseBuffersInBacklog(Buffer buffer) { - assert Thread.holdsLock(buffers); - - if (buffer != null && buffer.isBuffer()) { - buffersInBacklog++; - } - } - - @Override public int unsynchronizedGetNumberOfQueuedBuffers() { // since we do not synchronize, the size may actually be lower than 0! return Math.max(buffers.size(), 0); @@ -307,7 +266,7 @@ class SpillableSubpartition extends ResultSubpartition { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + "%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]", getTotalNumberOfBuffers(), getTotalNumberOfBytes(), - buffersInBacklog, isFinished, readView != null, spillWriter != null); + getBuffersInBacklog(), isFinished, readView != null, spillWriter != null); } }