[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once
This closes #5400. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3126bf52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3126bf52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3126bf52 Branch: refs/heads/master Commit: 3126bf522925a4f9a5cbf4c33077dbe7664363fa Parents: 831349a Author: Zhijiang <wangzhijiang...@aliyun.com> Authored: Fri Feb 2 15:45:49 2018 +0800 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Feb 22 14:27:36 2018 +0100 ---------------------------------------------------------------------- .../flink/configuration/TaskManagerOptions.java | 12 + .../streaming/runtime/io/BarrierBuffer.java | 36 +- .../streaming/runtime/io/BufferBlocker.java | 68 + .../runtime/io/BufferOrEventSequence.java | 59 + .../streaming/runtime/io/BufferSpiller.java | 78 +- .../runtime/io/CachedBufferBlocker.java | 147 ++ .../runtime/io/InputProcessorUtil.java | 71 + .../runtime/io/StreamInputProcessor.java | 25 +- .../runtime/io/StreamTwoInputProcessor.java | 25 +- .../io/BarrierBufferAlignmentLimitTest.java | 4 +- .../io/BarrierBufferMassiveRandomTest.java | 2 +- .../streaming/runtime/io/BarrierBufferTest.java | 1545 ------------------ .../runtime/io/BarrierBufferTestBase.java | 1426 ++++++++++++++++ .../runtime/io/BufferBlockerTestBase.java | 325 ++++ .../streaming/runtime/io/BufferSpillerTest.java | 341 +--- .../runtime/io/CachedBufferBlockerTest.java | 53 + .../io/CreditBasedBarrierBufferTest.java | 49 + .../runtime/io/SpillingBarrierBufferTest.java | 81 + 18 files changed, 2361 insertions(+), 1986 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index e01cf0f..cc3284c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -317,6 +317,18 @@ public class TaskManagerOptions { key("taskmanager.network.credit-based-flow-control.enabled") .defaultValue(true); + /** + * Config parameter defining whether to spill data for channels with barrier or not in exactly-once + * mode based on credit-based flow control. + * + * @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of + * credit-based flow control. + */ + @Deprecated + public static final ConfigOption<Boolean> EXACTLY_ONCE_BLOCKING_DATA_ENABLED = + key("taskmanager.exactly-once.blocking.data.enabled") + .defaultValue(true); + // ------------------------------------------------------------------------ // Task Options // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 7ef9fef..78852b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -41,6 +40,7 @@ import java.util.ArrayDeque; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until @@ -65,13 +65,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private final int totalNumberOfInputChannels; /** To utility to write blocked data to a file channel. */ - private final BufferSpiller bufferSpiller; + private final BufferBlocker bufferBlocker; /** * The pending blocked buffer/event sequences. Must be consumed before requesting further data * from the input gate. */ - private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered; + private final ArrayDeque<BufferOrEventSequence> queuedBuffered; /** * The maximum number of bytes that may be buffered before an alignment is broken. -1 means @@ -83,7 +83,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * The sequence of buffers/events that has been unblocked and must now be consumed before * requesting further data from the input gate. */ - private BufferSpiller.SpilledBufferOrEventSequence currentBuffered; + private BufferOrEventSequence currentBuffered; /** Handler that receives the checkpoint notifications. */ private AbstractInvokable toNotifyOnCheckpoint; @@ -118,12 +118,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * <p>There is no limit to how much data may be buffered during an alignment. * * @param inputGate The input gate to draw the buffers and events from. - * @param ioManager The I/O manager that gives access to the temp directories. + * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier. * * @throws IOException Thrown, when the spilling to temp files cannot be initialized. */ - public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException { - this (inputGate, ioManager, -1); + public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IOException { + this (inputGate, bufferBlocker, -1); } /** @@ -134,12 +134,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * checkpoint has been cancelled. * * @param inputGate The input gate to draw the buffers and events from. - * @param ioManager The I/O manager that gives access to the temp directories. + * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier. * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts. * * @throws IOException Thrown, when the spilling to temp files cannot be initialized. */ - public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException { + public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes) + throws IOException { checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0); this.inputGate = inputGate; @@ -147,8 +148,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; - this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize()); - this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>(); + this.bufferBlocker = checkNotNull(bufferBlocker); + this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>(); } // ------------------------------------------------------------------------ @@ -185,10 +186,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } BufferOrEvent bufferOrEvent = next.get(); - if (isBlocked(bufferOrEvent.getChannelIndex())) { // if the channel is blocked we, we just store the BufferOrEvent - bufferSpiller.add(bufferOrEvent); + bufferBlocker.add(bufferOrEvent); checkSizeLimit(); } else if (bufferOrEvent.isBuffer()) { @@ -399,7 +399,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } private void checkSizeLimit() throws Exception { - if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) { + if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) { // exceeded our limit - abort this checkpoint LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", currentCheckpointId, maxBufferedBytes); @@ -426,11 +426,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { @Override public void cleanup() throws IOException { - bufferSpiller.close(); + bufferBlocker.close(); if (currentBuffered != null) { currentBuffered.cleanup(); } - for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) { + for (BufferOrEventSequence seq : queuedBuffered) { seq.cleanup(); } queuedBuffered.clear(); @@ -491,7 +491,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (currentBuffered == null) { // common case: no more buffered data - currentBuffered = bufferSpiller.rollOver(); + currentBuffered = bufferBlocker.rollOverReusingResources(); if (currentBuffered != null) { currentBuffered.open(); } @@ -503,7 +503,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { "Pushing back current alignment buffers and feeding back new alignment data first."); // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one - BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer(); + BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources(); if (bufferedNow != null) { bufferedNow.open(); queuedBuffered.addFirst(currentBuffered); http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java new file mode 100644 index 0000000..4d0f66f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import java.io.IOException; + +/** + * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence. + * After a number of elements have been added, the blocker can "roll over": It presents the added + * elements as a readable sequence, and creates a new sequence. + */ +@Internal +public interface BufferBlocker { + + /** + * Adds a buffer or event to the blocker. + * + * @param boe The buffer or event to be added into the blocker. + */ + void add(BufferOrEvent boe) throws IOException; + + /** + * Starts a new sequence of buffers and event without reusing the same resources and + * returns the current sequence of buffers for reading. + * + * @return The readable sequence of buffers and events, or 'null', if nothing was added. + */ + BufferOrEventSequence rollOverWithoutReusingResources() throws IOException; + + /** + * Starts a new sequence of buffers and event reusing the same resources and + * returns the current sequence of buffers for reading. + * + * @return The readable sequence of buffers and events, or 'null', if nothing was added. + */ + BufferOrEventSequence rollOverReusingResources() throws IOException; + + /** + * Cleans up all the resources in the current sequence. + */ + void close() throws IOException; + + /** + * Gets the number of bytes blocked in the current sequence. + * + * @return the number of bytes blocked in the current sequence. + */ + long getBytesBlocked(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java new file mode 100644 index 0000000..c5bde1b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * This class represents a sequence of buffers and events which are blocked by + * {@link CheckpointBarrierHandler}. The sequence of buffers and events can be + * read back using the method {@link #getNext()}. + */ +@Internal +public interface BufferOrEventSequence { + + /** + * Initializes the sequence for reading. + */ + void open(); + + /** + * Gets the next BufferOrEvent from the sequence, or {@code null}, if the + * sequence is exhausted. + * + * @return The next BufferOrEvent from the buffered sequence, or {@code null} (end of sequence). + */ + @Nullable + BufferOrEvent getNext() throws IOException; + + /** + * Cleans up all the resources held by the sequence. + */ + void cleanup() throws IOException; + + /** + * Gets the size of the sequence. + */ + long size(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 33aac7e..7a0be33 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -45,13 +45,14 @@ import java.util.concurrent.atomic.AtomicInteger; * * <p>This implementation buffers data effectively in the OS cache, which gracefully extends to the * disk. Most data is written and re-read milliseconds later. The file is deleted after the read. - * Consequently, in most cases, the data will never actually hit the physical disks.</p> + * Consequently, in most cases, the data will never actually hit the physical disks. * * <p>IMPORTANT: The SpilledBufferOrEventSequences created by this spiller all reuse the same - * reading memory (to reduce overhead) and can consequently not be read concurrently.</p> + * reading memory (to reduce overhead) and can consequently not be read concurrently. */ @Internal -public class BufferSpiller { +@Deprecated +public class BufferSpiller implements BufferBlocker { /** Size of header in bytes (see add method). */ static final int HEADER_SIZE = 9; @@ -127,6 +128,7 @@ public class BufferSpiller { * @param boe The buffer or event to add and spill. * @throws IOException Thrown, if the buffer of event could not be spilled. */ + @Override public void add(BufferOrEvent boe) throws IOException { try { ByteBuffer contents; @@ -157,40 +159,35 @@ public class BufferSpiller { } /** - * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers - * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the - * last call to this method. + * NOTE: The BufferOrEventSequences created by this method all reuse the same reading memory + * (to reduce overhead) and can consequently not be read concurrently with each other. * - * <p>NOTE: The SpilledBufferOrEventSequences created by this method all reuse the same - * reading memory (to reduce overhead) and can consequently not be read concurrently with each other. - * To create a sequence that can be read concurrently with the previous SpilledBufferOrEventSequence, use the - * {@link #rollOverWithNewBuffer()} method.</p> + * <p>To create a sequence that can be read concurrently with the previous BufferOrEventSequence, + * use the {@link #rollOverWithoutReusingResources()} ()} method. * * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added. * @throws IOException Thrown, if the readable sequence could not be created, or no new spill * file could be created. */ - public SpilledBufferOrEventSequence rollOver() throws IOException { - return rollOverInternal(false); + @Override + public BufferOrEventSequence rollOverReusingResources() throws IOException { + return rollOver(false); } /** - * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers - * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the - * last call to this method. - * - * <p>The SpilledBufferOrEventSequence returned by this method is safe for concurrent consumption with - * any previously returned sequence.</p> + * The BufferOrEventSequence returned by this method is safe for concurrent consumption with + * any previously returned sequence. * * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added. * @throws IOException Thrown, if the readable sequence could not be created, or no new spill * file could be created. */ - public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws IOException { - return rollOverInternal(true); + @Override + public BufferOrEventSequence rollOverWithoutReusingResources() throws IOException { + return rollOver(true); } - private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException { + private BufferOrEventSequence rollOver(boolean newBuffer) throws IOException { if (bytesWritten == 0) { return null; } @@ -219,10 +216,11 @@ public class BufferSpiller { * Cleans up the current spilling channel and file. * * <p>Does not clean up the SpilledBufferOrEventSequences generated by calls to - * {@link #rollOver()}. + * {@link #rollOver(boolean false)}. * * @throws IOException Thrown if channel closing or file deletion fail. */ + @Override public void close() throws IOException { currentChannel.close(); if (!currentSpillFile.delete()) { @@ -232,9 +230,11 @@ public class BufferSpiller { /** * Gets the number of bytes written in the current spill file. + * * @return the number of bytes written in the current spill file */ - public long getBytesWritten() { + @Override + public long getBytesBlocked() { return bytesWritten; } @@ -264,10 +264,10 @@ public class BufferSpiller { /** * This class represents a sequence of spilled buffers and events, created by the - * {@link BufferSpiller}. The sequence of buffers and events can be read back using the - * method {@link #getNext()}. + * {@link BufferSpiller}. */ - public static class SpilledBufferOrEventSequence { + @Deprecated + public static class SpilledBufferOrEventSequence implements BufferOrEventSequence { /** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte). */ private static final int HEADER_LENGTH = 9; @@ -308,10 +308,10 @@ public class BufferSpiller { } /** - * Initializes the sequence for reading. - * This method needs to be called before the first call to {@link #getNext()}. Otherwise - * the results of {@link #getNext()} are not predictable. + * This method needs to be called before the first call to {@link #getNext()}. + * Otherwise the results of {@link #getNext()} are not predictable. */ + @Override public void open() { if (!opened) { opened = true; @@ -320,13 +320,7 @@ public class BufferSpiller { } } - /** - * Gets the next BufferOrEvent from the spilled sequence, or {@code null}, if the - * sequence is exhausted. - * - * @return The next BufferOrEvent from the spilled sequence, or {@code null} (end of sequence). - * @throws IOException Thrown, if the reads failed, of if the byte stream is corrupt. - */ + @Override public BufferOrEvent getNext() throws IOException { if (buffer.remaining() < HEADER_LENGTH) { buffer.compact(); @@ -413,11 +407,7 @@ public class BufferSpiller { } } - /** - * Cleans up all file resources held by this spilled sequence. - * - * @throws IOException Thrown, if file channel closing or file deletion fail. - */ + @Override public void cleanup() throws IOException { fileChannel.close(); if (!file.delete()) { @@ -425,10 +415,8 @@ public class BufferSpiller { } } - /** - * Gets the size of this spilled sequence. - */ - public long size() throws IOException { + @Override + public long size() { return size; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java new file mode 100644 index 0000000..f91e8cc --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; + +/** + * The cached buffer blocker takes the buffers and events from a data stream and adds them to a memory queue. + * After a number of elements have been cached, the blocker can "roll over": It presents the cached + * elements as a readable sequence, and creates a new memory queue. + * + * <p>This buffer blocked can be used in credit-based flow control for better barrier alignment in exactly-once mode. + */ +@Internal +public class CachedBufferBlocker implements BufferBlocker { + + /** The page size, to estimate the total cached data size. */ + private final int pageSize; + + /** The number of bytes cached since the last roll over. */ + private long bytesBlocked; + + /** The current memory queue for caching the buffers or events. */ + private ArrayDeque<BufferOrEvent> currentBuffers; + + /** + * Creates a new buffer blocker, caching the buffers or events in memory queue. + * + * @param pageSize The page size used to estimate the cached size. + */ + public CachedBufferBlocker(int pageSize) { + this.pageSize = pageSize; + this.currentBuffers = new ArrayDeque<BufferOrEvent>(); + } + + @Override + public void add(BufferOrEvent boe) { + bytesBlocked += pageSize; + + currentBuffers.add(boe); + } + + /** + * It is never reusing resources and is defaulting to {@link #rollOverWithoutReusingResources()}. + */ + @Override + public BufferOrEventSequence rollOverReusingResources() { + return rollOverWithoutReusingResources(); + } + + @Override + public BufferOrEventSequence rollOverWithoutReusingResources() { + if (bytesBlocked == 0) { + return null; + } + + CachedBufferOrEventSequence currentSequence = new CachedBufferOrEventSequence(currentBuffers, bytesBlocked); + currentBuffers = new ArrayDeque<BufferOrEvent>(); + bytesBlocked = 0L; + + return currentSequence; + } + + @Override + public void close() { + BufferOrEvent boe; + while ((boe = currentBuffers.poll()) != null) { + if (boe.isBuffer()) { + boe.getBuffer().recycleBuffer(); + } + } + } + + @Override + public long getBytesBlocked() { + return bytesBlocked; + } + + // ------------------------------------------------------------------------ + + /** + * This class represents a sequence of cached buffers and events, created by the + * {@link CachedBufferBlocker}. + */ + public static class CachedBufferOrEventSequence implements BufferOrEventSequence { + + /** The sequence of buffers and events to be consumed by {@link BarrierBuffer}.*/ + private final ArrayDeque<BufferOrEvent> queuedBuffers; + + /** The total size of the cached data. */ + private final long size; + + /** + * Creates a reader that reads a sequence of buffers and events. + * + * @param size The total size of cached data. + */ + CachedBufferOrEventSequence(ArrayDeque<BufferOrEvent> buffers, long size) { + this.queuedBuffers = buffers; + this.size = size; + } + + @Override + public void open() {} + + @Override + @Nullable + public BufferOrEvent getNext() { + return queuedBuffers.poll(); + } + + @Override + public void cleanup() { + BufferOrEvent boe; + while ((boe = queuedBuffers.poll()) != null) { + if (boe.isBuffer()) { + boe.getBuffer().recycleBuffer(); + } + } + } + + @Override + public long size() { + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java new file mode 100644 index 0000000..cb56eee --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.io.IOException; + +/** + * Utility for creating {@link CheckpointBarrierHandler} based on checkpoint mode + * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}. + */ +@Internal +public class InputProcessorUtil { + + public static CheckpointBarrierHandler createCheckpointBarrierHandler( + StreamTask<?, ?> checkpointedTask, + CheckpointingMode checkpointMode, + IOManager ioManager, + InputGate inputGate, + Configuration taskManagerConfig) throws IOException { + + CheckpointBarrierHandler barrierHandler; + if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { + long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); + if (!(maxAlign == -1 || maxAlign > 0)) { + throw new IllegalConfigurationException( + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() + + " must be positive or -1 (infinite)"); + } + + if (taskManagerConfig.getBoolean(TaskManagerOptions.EXACTLY_ONCE_BLOCKING_DATA_ENABLED)) { + barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign); + } else { + barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign); + } + } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { + barrierHandler = new BarrierTracker(inputGate); + } else { + throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode); + } + + if (checkpointedTask != null) { + barrierHandler.registerCheckpointEventHandler(checkpointedTask); + } + + return barrierHandler; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index dc3dc5c..a9c64b5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; @@ -127,25 +125,8 @@ public class StreamInputProcessor<IN> { InputGate inputGate = InputGateUtil.createInputGate(inputGates); - if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); - if (!(maxAlign == -1 || maxAlign > 0)) { - throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); - } - this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign); - } - else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { - this.barrierHandler = new BarrierTracker(inputGate); - } - else { - throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode); - } - - if (checkpointedTask != null) { - this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); - } + this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( + checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig); this.lock = checkNotNull(lock); @@ -157,7 +138,7 @@ public class StreamInputProcessor<IN> { for (int i = 0; i < recordDeserializers.length; i++) { recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>( - ioManager.getSpillingDirectoriesPaths()); + ioManager.getSpillingDirectoriesPaths()); } this.numInputChannels = inputGate.getNumberOfInputChannels(); http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 494a82a..ab4f90d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; @@ -146,25 +144,8 @@ public class StreamTwoInputProcessor<IN1, IN2> { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); - if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); - if (!(maxAlign == -1 || maxAlign > 0)) { - throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); - } - this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign); - } - else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { - this.barrierHandler = new BarrierTracker(inputGate); - } - else { - throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode); - } - - if (checkpointedTask != null) { - this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); - } + this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( + checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig); this.lock = checkNotNull(lock); @@ -179,7 +160,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { for (int i = 0; i < recordDeserializers.length; i++) { recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>( - ioManager.getSpillingDirectoriesPaths()); + ioManager.getSpillingDirectoriesPaths()); } // determine which unioned channels belong to input 1 and which belong to input 2 http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index c31d5ad..9f46ed7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -115,7 +115,7 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 1000); + BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000); AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer.registerCheckpointEventHandler(toNotify); @@ -209,7 +209,7 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 500); + BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500); AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer.registerCheckpointEventHandler(toNotify); http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 39c41ef..6dd1e5e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -63,7 +63,7 @@ public class BarrierBufferMassiveRandomTest { new BufferPool[] { pool1, pool2 }, new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) }); - BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan); + BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, new BufferSpiller(ioMan, myIG.getPageSize())); for (int i = 0; i < 2000000; i++) { BufferOrEvent boe = barrierBuffer.getNextNonBlocked();