[FLINK-8589][runtime] Add polling method to InputGate

This is a preparation for changes in data notifications, which will not be
that strict as they are now.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98bd689a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98bd689a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98bd689a

Branch: refs/heads/master
Commit: 98bd689a2565ec5cf344541f37cd0b0db691c08f
Parents: 1310c72
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Feb 1 14:13:42 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:40 2018 +0100

----------------------------------------------------------------------
 .../api/reader/AbstractRecordReader.java        |  2 +-
 .../network/partition/consumer/InputGate.java   | 15 ++++-
 .../partition/consumer/SingleInputGate.java     | 25 ++++++--
 .../partition/consumer/UnionInputGate.java      | 53 ++++++++++++----
 .../partition/InputGateFairnessTest.java        | 11 ++--
 .../PartialConsumePipelinedResultTest.java      |  2 +-
 .../consumer/LocalInputChannelTest.java         | 13 ++--
 .../partition/consumer/SingleInputGateTest.java | 11 ++--
 .../partition/consumer/UnionInputGateTest.java  |  5 +-
 .../streaming/runtime/io/BarrierBuffer.java     | 66 +++++++++++---------
 .../streaming/runtime/io/BarrierTracker.java    | 22 ++++---
 .../io/BarrierBufferMassiveRandomTest.java      | 20 ++++--
 .../streaming/runtime/io/MockInputGate.java     | 22 ++++---
 13 files changed, 176 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e3c8484..9cfc729 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -83,7 +83,7 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                                }
                        }
 
-                       final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent();
+                       final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new);
 
                        if (bufferOrEvent.isBuffer()) {
                                currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 1f2182e..0413caa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.event.TaskEvent;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * An input gate consumes one or more partitions of a single produced 
intermediate result.
@@ -72,7 +73,19 @@ public interface InputGate {
 
        void requestPartitions() throws IOException, InterruptedException;
 
-       BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException;
+       /**
+        * Blocking call waiting for next {@link BufferOrEvent}.
+        *
+        * @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
+        */
+       Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, 
InterruptedException;
+
+       /**
+        * Poll the {@link BufferOrEvent}.
+        *
+        * @return {@code Optional.empty()} if there is no data to return or if 
{@link #isFinished()} returns true.
+        */
+       Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, 
InterruptedException;
 
        void sendTaskEvent(TaskEvent event) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 1175c52..337b3c2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -52,6 +52,7 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Timer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -486,9 +487,18 @@ public class SingleInputGate implements InputGate {
        // 
------------------------------------------------------------------------
 
        @Override
-       public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
+       public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
+               return getNextBufferOrEvent(true);
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+               return getNextBufferOrEvent(false);
+       }
+
+       private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) 
throws IOException, InterruptedException {
                if (hasReceivedAllEndOfPartitionEvents) {
-                       return null;
+                       return Optional.empty();
                }
 
                if (isReleased) {
@@ -505,7 +515,12 @@ public class SingleInputGate implements InputGate {
                                        throw new 
IllegalStateException("Released");
                                }
 
-                               inputChannelsWithData.wait();
+                               if (blocking) {
+                                       inputChannelsWithData.wait();
+                               }
+                               else {
+                                       return Optional.empty();
+                               }
                        }
 
                        currentChannel = inputChannelsWithData.remove();
@@ -528,7 +543,7 @@ public class SingleInputGate implements InputGate {
 
                final Buffer buffer = result.buffer();
                if (buffer.isBuffer()) {
-                       return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable);
+                       return Optional.of(new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable));
                }
                else {
                        final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -545,7 +560,7 @@ public class SingleInputGate implements InputGate {
                                currentChannel.releaseAllResources();
                        }
 
-                       return new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable);
+                       return Optional.of(new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 87443d2..14c04bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -139,24 +140,17 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        }
 
        @Override
-       public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
+       public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
                if (inputGatesWithRemainingData.isEmpty()) {
-                       return null;
+                       return Optional.empty();
                }
 
                // Make sure to request the partitions, if they have not been 
requested before.
                requestPartitions();
 
-               final InputGate inputGate;
-               synchronized (inputGatesWithData) {
-                       while (inputGatesWithData.size() == 0) {
-                               inputGatesWithData.wait();
-                       }
-
-                       inputGate = inputGatesWithData.remove();
-               }
-
-               final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent();
+               InputGateWithData inputGateWithData = waitAndGetNextInputGate();
+               InputGate inputGate = inputGateWithData.inputGate;
+               BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
 
                if (bufferOrEvent.moreAvailable()) {
                        // this buffer or event was now removed from the 
non-empty gates queue
@@ -180,7 +174,40 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
 
                bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-               return bufferOrEvent;
+               return Optional.ofNullable(bufferOrEvent);
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+               throw new UnsupportedOperationException();
+       }
+
+       private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+               while (true) {
+                       InputGate inputGate;
+                       synchronized (inputGatesWithData) {
+                               while (inputGatesWithData.size() == 0) {
+                                       inputGatesWithData.wait();
+                               }
+                               inputGate = inputGatesWithData.remove();
+                       }
+
+                       // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
+                       Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
+                       if (bufferOrEvent.isPresent()) {
+                               return new InputGateWithData(inputGate, 
bufferOrEvent.get());
+                       }
+               }
+       }
+
+       private static class InputGateWithData {
+               private final InputGate inputGate;
+               private final BufferOrEvent bufferOrEvent;
+
+               public InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent) {
+                       this.inputGate = checkNotNull(inputGate);
+                       this.bufferOrEvent = checkNotNull(bufferOrEvent);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index c58d20a..45df56f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -46,13 +47,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -116,7 +117,7 @@ public class InputGateFairnessTest {
                        assertTrue(max == min || max == min+1);
                }
 
-               assertNull(gate.getNextBufferOrEvent());
+               assertFalse(gate.getNextBufferOrEvent().isPresent());
        }
 
        @Test
@@ -232,7 +233,7 @@ public class InputGateFairnessTest {
                        assertTrue(max == min || max == min+1);
                }
 
-               assertNull(gate.getNextBufferOrEvent());
+               assertFalse(gate.getNextBufferOrEvent().isPresent());
        }
 
        @Test
@@ -368,7 +369,7 @@ public class InputGateFairnessTest {
 
 
                @Override
-               public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
+               public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
                        synchronized (channelsWithData) {
                                assertTrue("too many input channels", 
channelsWithData.size() <= getNumberOfInputChannels());
                                ensureUnique(channelsWithData);

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 666581c..76e6f2c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -141,7 +141,7 @@ public class PartialConsumePipelinedResultTest extends 
TestLogger {
                @Override
                public void invoke() throws Exception {
                        InputGate gate = getEnvironment().getInputGate(0);
-                       Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
+                       Buffer buffer = 
gate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new).getBuffer();
                        if (buffer != null) {
                                buffer.recycleBuffer();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1cdf5c3..ab276cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -52,6 +52,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Callable;
@@ -517,17 +518,17 @@ public class LocalInputChannelTest {
                        final int[] numberOfBuffersPerChannel = new 
int[numberOfInputChannels];
 
                        try {
-                               BufferOrEvent boe;
-                               while ((boe = inputGate.getNextBufferOrEvent()) 
!= null) {
-                                       if (boe.isBuffer()) {
-                                               boe.getBuffer().recycleBuffer();
+                               Optional<BufferOrEvent> boe;
+                               while ((boe = 
inputGate.getNextBufferOrEvent()).isPresent()) {
+                                       if (boe.get().isBuffer()) {
+                                               
boe.get().getBuffer().recycleBuffer();
 
                                                // Check that we don't receive 
too many buffers
-                                               if 
(++numberOfBuffersPerChannel[boe.getChannelIndex()]
+                                               if 
(++numberOfBuffersPerChannel[boe.get().getChannelIndex()]
                                                                > 
numberOfExpectedBuffersPerChannel) {
 
                                                        throw new 
IllegalStateException("Received more buffers than expected " +
-                                                                       "on 
channel " + boe.getChannelIndex() + ".");
+                                                                       "on 
channel " + boe.get().getChannelIndex() + ".");
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2a2b364..17425f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -52,12 +52,12 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -115,7 +115,7 @@ public class SingleInputGateTest {
 
                // Return null when the input gate has received all 
end-of-partition events
                assertTrue(inputGate.isFinished());
-               assertNull(inputGate.getNextBufferOrEvent());
+               assertFalse(inputGate.getNextBufferOrEvent().isPresent());
        }
 
        @Test
@@ -448,8 +448,9 @@ public class SingleInputGateTest {
                boolean isBuffer,
                int channelIndex) throws IOException, InterruptedException {
 
-               final BufferOrEvent boe = inputGate.getNextBufferOrEvent();
-               assertEquals(isBuffer, boe.isBuffer());
-               assertEquals(channelIndex, boe.getChannelIndex());
+               final Optional<BufferOrEvent> bufferOrEvent = 
inputGate.getNextBufferOrEvent();
+               assertTrue(bufferOrEvent.isPresent());
+               assertEquals(isBuffer, bufferOrEvent.get().isBuffer());
+               assertEquals(channelIndex, 
bufferOrEvent.get().getChannelIndex());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 9884855..9b16471 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -23,10 +23,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -111,6 +112,6 @@ public class UnionInputGateTest {
 
                // Return null when the input gate has received all 
end-of-partition events
                assertTrue(union.isFinished());
-               assertNull(union.getNextBufferOrEvent());
+               assertFalse(union.getNextBufferOrEvent().isPresent());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index ecfd732..7ef9fef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -158,52 +159,55 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        public BufferOrEvent getNextNonBlocked() throws Exception {
                while (true) {
                        // process buffered BufferOrEvents before grabbing new 
ones
-                       BufferOrEvent next;
+                       Optional<BufferOrEvent> next;
                        if (currentBuffered == null) {
                                next = inputGate.getNextBufferOrEvent();
                        }
                        else {
-                               next = currentBuffered.getNext();
-                               if (next == null) {
+                               next = 
Optional.ofNullable(currentBuffered.getNext());
+                               if (!next.isPresent()) {
                                        completeBufferedSequence();
                                        return getNextNonBlocked();
                                }
                        }
 
-                       if (next != null) {
-                               if (isBlocked(next.getChannelIndex())) {
-                                       // if the channel is blocked we, we 
just store the BufferOrEvent
-                                       bufferSpiller.add(next);
-                                       checkSizeLimit();
-                               }
-                               else if (next.isBuffer()) {
-                                       return next;
-                               }
-                               else if (next.getEvent().getClass() == 
CheckpointBarrier.class) {
-                                       if (!endOfStream) {
-                                               // process barriers only if 
there is a chance of the checkpoint completing
-                                               
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
-                                       }
-                               }
-                               else if (next.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                                       
processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+                       if (!next.isPresent()) {
+                               if (!endOfStream) {
+                                       // end of input stream. stream 
continues with the buffered data
+                                       endOfStream = true;
+                                       releaseBlocksAndResetBarriers();
+                                       return getNextNonBlocked();
                                }
                                else {
-                                       if (next.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                               processEndOfPartition();
-                                       }
-                                       return next;
+                                       // final end of both input and buffered 
data
+                                       return null;
                                }
                        }
-                       else if (!endOfStream) {
-                               // end of input stream. stream continues with 
the buffered data
-                               endOfStream = true;
-                               releaseBlocksAndResetBarriers();
-                               return getNextNonBlocked();
+
+                       BufferOrEvent bufferOrEvent = next.get();
+
+                       if (isBlocked(bufferOrEvent.getChannelIndex())) {
+                               // if the channel is blocked we, we just store 
the BufferOrEvent
+                               bufferSpiller.add(bufferOrEvent);
+                               checkSizeLimit();
+                       }
+                       else if (bufferOrEvent.isBuffer()) {
+                               return bufferOrEvent;
+                       }
+                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
+                               if (!endOfStream) {
+                                       // process barriers only if there is a 
chance of the checkpoint completing
+                                       processBarrier((CheckpointBarrier) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
+                               }
+                       }
+                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                               
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
                        }
                        else {
-                               // final end of both input and buffered data
-                               return null;
+                               if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
+                                       processEndOfPartition();
+                               }
+                               return bufferOrEvent;
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 8178fbc..f929226 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
+import java.util.Optional;
 
 /**
  * The BarrierTracker keeps track of what checkpoint barriers have been 
received from
@@ -90,20 +91,25 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        @Override
        public BufferOrEvent getNextNonBlocked() throws Exception {
                while (true) {
-                       BufferOrEvent next = inputGate.getNextBufferOrEvent();
-                       if (next == null || next.isBuffer()) {
+                       Optional<BufferOrEvent> next = 
inputGate.getNextBufferOrEvent();
+                       if (!next.isPresent()) {
                                // buffer or input exhausted
-                               return next;
+                               return null;
                        }
-                       else if (next.getEvent().getClass() == 
CheckpointBarrier.class) {
-                               processBarrier((CheckpointBarrier) 
next.getEvent(), next.getChannelIndex());
+
+                       BufferOrEvent bufferOrEvent = next.get();
+                       if (bufferOrEvent.isBuffer()) {
+                               return bufferOrEvent;
+                       }
+                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
+                               processBarrier((CheckpointBarrier) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                        }
-                       else if (next.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               
processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent(), 
next.getChannelIndex());
+                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                               
processCheckpointAbortBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                        }
                        else {
                                // some other event
-                               return next;
+                               return bufferOrEvent;
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 96d79bb..39c41ef 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Random;
 
 import static org.junit.Assert.fail;
@@ -159,21 +160,30 @@ public class BarrierBufferMassiveRandomTest {
                public void requestPartitions() {}
 
                @Override
-               public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
+               public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
                        currentChannel = (currentChannel + 1) % numChannels;
 
                        if (barrierGens[currentChannel].isNextBarrier()) {
-                               return new BufferOrEvent(
-                                               new 
CheckpointBarrier(++currentBarriers[currentChannel], 
System.currentTimeMillis(), 
CheckpointOptions.forCheckpointWithDefaultLocation()),
-                                                       currentChannel);
+                               return Optional.of(
+                                       new BufferOrEvent(
+                                               new CheckpointBarrier(
+                                                       
++currentBarriers[currentChannel],
+                                                       
System.currentTimeMillis(),
+                                                       
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                                               currentChannel));
                        } else {
                                Buffer buffer = 
bufferPools[currentChannel].requestBuffer();
                                buffer.getMemorySegment().putLong(0, c++);
-                               return new BufferOrEvent(buffer, 
currentChannel);
+                               return Optional.of(new BufferOrEvent(buffer, 
currentChannel));
                        }
                }
 
                @Override
+               public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+                       return getNextBufferOrEvent();
+               }
+
+               @Override
                public void sendTaskEvent(TaskEvent event) {}
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 77c938a..e62b709 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import java.util.ArrayDeque;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 
 /**
@@ -37,16 +38,16 @@ public class MockInputGate implements InputGate {
 
        private final int numChannels;
 
-       private final Queue<BufferOrEvent> boes;
+       private final Queue<BufferOrEvent> bufferOrEvents;
 
        private final boolean[] closed;
 
        private int closedChannels;
 
-       public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
boes) {
+       public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
bufferOrEvents) {
                this.pageSize = pageSize;
                this.numChannels = numChannels;
-               this.boes = new ArrayDeque<BufferOrEvent>(boes);
+               this.bufferOrEvents = new 
ArrayDeque<BufferOrEvent>(bufferOrEvents);
                this.closed = new boolean[numChannels];
        }
 
@@ -62,14 +63,14 @@ public class MockInputGate implements InputGate {
 
        @Override
        public boolean isFinished() {
-               return boes.isEmpty();
+               return bufferOrEvents.isEmpty();
        }
 
        @Override
-       public BufferOrEvent getNextBufferOrEvent() {
-               BufferOrEvent next = boes.poll();
+       public Optional<BufferOrEvent> getNextBufferOrEvent() {
+               BufferOrEvent next = bufferOrEvents.poll();
                if (next == null) {
-                       return null;
+                       return Optional.empty();
                }
 
                int channelIdx = next.getChannelIndex();
@@ -81,7 +82,12 @@ public class MockInputGate implements InputGate {
                        closed[channelIdx] = true;
                        closedChannels++;
                }
-               return next;
+               return Optional.of(next);
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNextBufferOrEvent() {
+               return getNextBufferOrEvent();
        }
 
        @Override

Reply via email to