http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 3faa614..4f3a5f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
@@ -52,7 +53,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -75,23 +75,112 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                return new PipelinedSubpartition(0, parent);
        }
 
+       @Test(expected = IllegalStateException.class)
+       public void testAddTwoNonFinishedBuffer() throws Exception {
+               final ResultSubpartition subpartition = createSubpartition();
+               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
+               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
+               availablityListener.resetNotificationCounters();
+
+               try {
+                       
subpartition.add(createBufferBuilder().createBufferConsumer());
+                       
subpartition.add(createBufferBuilder().createBufferConsumer());
+                       assertNull(readView.getNextBuffer());
+               } finally {
+                       subpartition.release();
+               }
+       }
+
+       @Test
+       public void testAddEmptyNonFinishedBuffer() throws Exception {
+               final ResultSubpartition subpartition = createSubpartition();
+               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
+               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
+               availablityListener.resetNotificationCounters();
+
+               try {
+                       assertEquals(0, 
availablityListener.getNumNotifications());
+
+                       BufferBuilder bufferBuilder = createBufferBuilder();
+                       subpartition.add(bufferBuilder.createBufferConsumer());
+
+                       assertEquals(0, 
availablityListener.getNumNotifications());
+                       assertNull(readView.getNextBuffer());
+
+                       bufferBuilder.finish();
+                       bufferBuilder = createBufferBuilder();
+                       subpartition.add(bufferBuilder.createBufferConsumer());
+
+                       assertEquals(1, 
availablityListener.getNumNotifications()); // notification from finishing 
previous buffer.
+                       assertNull(readView.getNextBuffer());
+                       assertEquals(1, subpartition.getBuffersInBacklog());
+               } finally {
+                       readView.releaseAllResources();
+                       subpartition.release();
+               }
+       }
+
+       @Test
+       public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+               final ResultSubpartition subpartition = createSubpartition();
+               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
+               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
+               availablityListener.resetNotificationCounters();
+
+               try {
+                       assertEquals(0, 
availablityListener.getNumNotifications());
+
+                       BufferBuilder bufferBuilder = createBufferBuilder();
+                       bufferBuilder.append(ByteBuffer.allocate(1024));
+                       subpartition.add(bufferBuilder.createBufferConsumer());
+
+                       assertNextBuffer(readView, 1024, false, 1);
+                       assertEquals(1, subpartition.getBuffersInBacklog());
+               } finally {
+                       readView.releaseAllResources();
+                       subpartition.release();
+               }
+       }
+
+       @Test
+       public void testMultipleEmptyBuffers() throws Exception {
+               final ResultSubpartition subpartition = createSubpartition();
+               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
+               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
+               availablityListener.resetNotificationCounters();
+
+               try {
+                       assertEquals(0, 
availablityListener.getNumNotifications());
+
+                       subpartition.add(createFilledBufferConsumer(0));
+
+                       assertEquals(1, 
availablityListener.getNumNotifications());
+                       subpartition.add(createFilledBufferConsumer(0));
+                       assertEquals(2, 
availablityListener.getNumNotifications());
+
+                       subpartition.add(createFilledBufferConsumer(0));
+                       assertEquals(2, 
availablityListener.getNumNotifications());
+                       assertEquals(3, subpartition.getBuffersInBacklog());
+
+                       subpartition.add(createFilledBufferConsumer(1024));
+                       assertEquals(2, 
availablityListener.getNumNotifications());
+
+                       assertNextBuffer(readView, 1024, false, 0);
+               } finally {
+                       readView.releaseAllResources();
+                       subpartition.release();
+               }
+       }
+
        @Test
        public void testIllegalReadViewRequest() throws Exception {
                final PipelinedSubpartition subpartition = createSubpartition();
 
                // Successful request
-               assertNotNull(subpartition.createReadView(new 
BufferAvailabilityListener() {
-                       @Override
-                       public void notifyBuffersAvailable(long numBuffers) {
-                       }
-               }));
+               assertNotNull(subpartition.createReadView(new 
NoOpBufferAvailablityListener()));
 
                try {
-                       subpartition.createReadView(new 
BufferAvailabilityListener() {
-                               @Override
-                               public void notifyBuffersAvailable(long 
numBuffers) {
-                               }
-                       });
+                       subpartition.createReadView(new 
NoOpBufferAvailablityListener());
 
                        fail("Did not throw expected exception after duplicate 
notifyNonEmpty view request.");
                } catch (IllegalStateException expected) {
@@ -110,7 +199,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertFalse(view.nextBufferIsEvent());
                assertNull(view.getNextBuffer());
                assertFalse(view.nextBufferIsEvent()); // also after 
getNextBuffer()
-               verify(listener, times(1)).notifyBuffersAvailable(eq(0L));
+               verify(listener, times(0)).notifyDataAvailable();
 
                // Add data to the queue...
                subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
@@ -122,7 +211,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 //             assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 
                // ...should have resulted in a notification
-               verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
+               verify(listener, times(1)).notifyDataAvailable();
 
                // ...and one available result
                assertFalse(view.nextBufferIsEvent());
@@ -144,7 +233,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(1, subpartition.getBuffersInBacklog());
                // TODO: re-enable?
 //             assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
-               verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
+               verify(listener, times(2)).notifyDataAvailable();
 
                assertFalse(view.nextBufferIsEvent());
                read = view.getNextBuffer();
@@ -171,7 +260,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(2, subpartition.getBuffersInBacklog()); // two 
buffers (events don't count)
                // TODO: re-enable?
 //             assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
-               verify(listener, times(5)).notifyBuffersAvailable(eq(1L));
+               verify(listener, times(4)).notifyDataAvailable();
 
                assertFalse(view.nextBufferIsEvent()); // the first buffer
                read = view.getNextBuffer();
@@ -199,7 +288,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                assertEquals(5, subpartition.getTotalNumberOfBuffers());
                assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
-               verify(listener, times(5)).notifyBuffersAvailable(eq(1L));
+               verify(listener, times(4)).notifyDataAvailable();
        }
 
        @Test
@@ -357,7 +446,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        // create the read view first
                        ResultSubpartitionView view = null;
                        if (createView) {
-                               view = partition.createReadView(numBuffers -> 
{});
+                               view = partition.createReadView(new 
NoOpBufferAvailablityListener());
                        }
 
                        partition.release();

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index ea06dd4..9dc7bed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -167,12 +167,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
                // Create the read view
                ResultSubpartitionView readView = spy(partition
-                       .createReadView(new BufferAvailabilityListener() {
-                               @Override
-                               public void notifyBuffersAvailable(long 
numBuffers) {
-
-                               }
-                       }));
+                       .createReadView(new NoOpBufferAvailablityListener()));
 
                // The released state check (of the parent) needs to be 
independent
                // of the released state of the view.
@@ -223,7 +218,6 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                assertEquals(1, listener.getNumNotifications());
-               assertEquals(5, listener.getNumNotifiedBuffers());
 
                assertFalse(reader.nextBufferIsEvent()); // buffer
                BufferAndBacklog read = reader.getNextBuffer();
@@ -315,7 +309,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(listener);
 
                // Initial notification
-               assertEquals(1, listener.getNumNotifiedBuffers());
+               assertEquals(1, listener.getNumNotifications());
                assertFalse(bufferConsumer.isRecycled());
 
                assertFalse(reader.nextBufferIsEvent());
@@ -325,7 +319,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(2, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                read.buffer().recycleBuffer();
-               assertEquals(2, listener.getNumNotifiedBuffers());
+               assertEquals(2, listener.getNumNotifications());
                assertFalse(bufferConsumer.isRecycled());
                assertFalse(read.nextBufferIsEvent());
 
@@ -338,8 +332,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                //TODO: re-enable this?
 //             assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes());
 
-               listener.awaitNotifications(5, 30_000);
-               assertEquals(5, listener.getNumNotifiedBuffers());
+               listener.awaitNotifications(3, 30_000);
+               assertEquals(3, listener.getNumNotifications());
 
                assertFalse(reader.nextBufferIsEvent()); // second buffer 
(retained in SpillableSubpartition#nextBuffer)
                read = reader.getNextBuffer();
@@ -555,7 +549,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                        if (createView) {
                                // Create a read view
                                partition.finish();
-                               partition.createReadView(numBuffers -> {});
+                               partition.createReadView(new 
NoOpBufferAvailablityListener());
                        }
 
                        // one instance of the buffers is placed in the view's 
nextBuffer and not released
@@ -668,7 +662,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                        ResultSubpartitionView view = null;
                        if (createView) {
                                partition.finish();
-                               view = partition.createReadView(numBuffers -> 
{});
+                               view = partition.createReadView(new 
NoOpBufferAvailablityListener());
                        }
                        if (spilled) {
                                // note: in case we create a view, one buffer 
will already reside in the view and
@@ -706,33 +700,6 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 //             assertEquals((createView ? 4 : 0) + 2 * BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes());
        }
 
-       private static class AwaitableBufferAvailablityListener implements 
BufferAvailabilityListener {
-
-               private long numNotifiedBuffers;
-               private long numNotifications;
-
-               @Override
-               public void notifyBuffersAvailable(long numBuffers) {
-                       numNotifiedBuffers += numBuffers;
-                       ++numNotifications;
-               }
-
-               long getNumNotifiedBuffers() {
-                       return numNotifiedBuffers;
-               }
-
-               public long getNumNotifications() {
-                       return numNotifications;
-               }
-
-               void awaitNotifications(long awaitedNumNotifiedBuffers, long 
timeoutMillis) throws InterruptedException {
-                       long deadline = System.currentTimeMillis() + 
timeoutMillis;
-                       while (numNotifiedBuffers < awaitedNumNotifiedBuffers 
&& System.currentTimeMillis() < deadline) {
-                               Thread.sleep(1);
-                       }
-               }
-       }
-
        /**
         * An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
         * {@link #createBufferFileWriter(FileIOChannel.ID)} method.

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 9b8bd54..48846b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -24,6 +24,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -127,4 +129,15 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                // Verify that parent release is reflected at partition view
                assertTrue(view.isReleased());
        }
+
+       protected void assertNextBuffer(
+                       ResultSubpartitionView readView,
+                       int expectedReadableBufferSize,
+                       boolean expectedIsMoreAvailable,
+                       int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+               ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
readView.getNextBuffer();
+               assertEquals(expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
+               assertEquals(expectedIsMoreAvailable, 
bufferAndBacklog.isMoreAvailable());
+               assertEquals(expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index cd75a7b..abadddf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -127,8 +128,8 @@ public class InputChannelTest {
                }
 
                @Override
-               BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
-                       return null;
+               Optional<BufferAndAvailability> getNextBuffer() throws 
IOException, InterruptedException {
+                       return Optional.empty();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index ef30ee1..9de2bbe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
@@ -32,6 +33,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
@@ -64,12 +66,12 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
 
                // The input iterator can produce an infinite stream. That's 
why we have to serialize each
                // record on demand and cannot do it upfront.
-               final Answer<InputChannel.BufferAndAvailability> answer = new 
Answer<InputChannel.BufferAndAvailability>() {
+               final Answer<Optional<BufferAndAvailability>> answer = new 
Answer<Optional<BufferAndAvailability>>() {
 
                        private boolean hasData = inputIterator.next(reuse) != 
null;
 
                        @Override
-                       public InputChannel.BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                       public Optional<BufferAndAvailability> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                if (hasData) {
                                        serializer.clear();
                                        BufferBuilder bufferBuilder = 
createBufferBuilder(bufferSize);
@@ -79,11 +81,11 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
                                        hasData = inputIterator.next(reuse) != 
null;
 
                                        // Call getCurrentBuffer to ensure size 
is set
-                                       return new 
InputChannel.BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0);
+                                       return Optional.of(new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
                                } else {
                                        
when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
 
-                                       return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false, 0);
+                                       return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false, 0));
                                }
                        }
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index ab276cd..d5c2492 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -64,6 +64,7 @@ import scala.Tuple2;
 
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -361,9 +362,9 @@ public class LocalInputChannelTest {
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
                when(partitionManager.createSubpartitionView(
-                               any(ResultPartitionID.class),
-                               anyInt(),
-                               
any(BufferAvailabilityListener.class))).thenReturn(reader);
+                       any(ResultPartitionID.class),
+                       anyInt(),
+                       
any(BufferAvailabilityListener.class))).thenReturn(reader);
 
                LocalInputChannel channel = new LocalInputChannel(
                        gate,
@@ -379,11 +380,7 @@ public class LocalInputChannelTest {
                when(reader.getNextBuffer()).thenReturn(null);
                when(reader.isReleased()).thenReturn(false);
 
-               try {
-                       channel.getNextBuffer();
-                       fail("Did not throw expected IllegalStateException");
-               } catch (IllegalStateException ignored) {
-               }
+               assertFalse(channel.getNextBuffer().isPresent());
 
                // Null buffer and released
                when(reader.getNextBuffer()).thenReturn(null);
@@ -394,6 +391,9 @@ public class LocalInputChannelTest {
                        fail("Did not throw expected CancelTaskException");
                } catch (CancelTaskException ignored) {
                }
+
+               channel.releaseAllResources();
+               assertFalse(channel.getNextBuffer().isPresent());
        }
 
        // 
---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 17425f2..0dd0875 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -126,7 +126,7 @@ public class SingleInputGateTest {
 
                final ResultSubpartitionView iterator = 
mock(ResultSubpartitionView.class);
                when(iterator.getNextBuffer()).thenReturn(
-                       new BufferAndBacklog(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
FreeingBufferRecycler.INSTANCE), 0, false));
+                       new BufferAndBacklog(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
FreeingBufferRecycler.INSTANCE), false,0, false));
 
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager.createSubpartitionView(

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 43ac7a1..f9060f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -28,6 +29,7 @@ import org.mockito.stubbing.Answer;
 import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,7 +46,7 @@ public class TestInputChannel {
        private final SingleInputGate inputGate;
 
        // Abusing Mockito here... ;)
-       protected OngoingStubbing<InputChannel.BufferAndAvailability> stubbing;
+       protected OngoingStubbing<Optional<BufferAndAvailability>> stubbing;
 
        public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
                checkArgument(channelIndex >= 0);
@@ -55,9 +57,9 @@ public class TestInputChannel {
 
        public TestInputChannel read(Buffer buffer) throws IOException, 
InterruptedException {
                if (stubbing == null) {
-                       stubbing = when(mock.getNextBuffer()).thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true, 0));
+                       stubbing = 
when(mock.getNextBuffer()).thenReturn(Optional.of(new 
BufferAndAvailability(buffer, true, 0)));
                } else {
-                       stubbing = stubbing.thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true, 0));
+                       stubbing = stubbing.thenReturn(Optional.of(new 
BufferAndAvailability(buffer, true, 0)));
                }
 
                return this;
@@ -71,13 +73,13 @@ public class TestInputChannel {
        }
 
        public TestInputChannel readEndOfPartitionEvent() throws IOException, 
InterruptedException {
-               final Answer<InputChannel.BufferAndAvailability> answer = new 
Answer<InputChannel.BufferAndAvailability>() {
+               final Answer<Optional<BufferAndAvailability>> answer = new 
Answer<Optional<BufferAndAvailability>>() {
                        @Override
-                       public InputChannel.BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                       public Optional<BufferAndAvailability> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                // Return true after finishing
                                when(mock.isReleased()).thenReturn(true);
 
-                               return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false, 0);
+                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false, 0));
                        }
                };
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 2c6ee50..c3a4a32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -29,7 +29,7 @@ import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -62,7 +62,7 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean>, BufferAvaila
        /** Random source for sleeps. */
        private final Random random;
 
-       private final AtomicLong numBuffersAvailable = new AtomicLong();
+       private final AtomicBoolean dataAvailableNotification = new 
AtomicBoolean(false);
 
        public TestSubpartitionConsumer(
                boolean isSlowConsumer,
@@ -85,11 +85,9 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean>, BufferAvaila
                                        throw new InterruptedException();
                                }
 
-                               if (numBuffersAvailable.get() == 0) {
-                                       synchronized (numBuffersAvailable) {
-                                               while 
(numBuffersAvailable.get() == 0) {
-                                                       
numBuffersAvailable.wait();
-                                               }
+                               synchronized (dataAvailableNotification) {
+                                       while 
(!dataAvailableNotification.getAndSet(false)) {
+                                               
dataAvailableNotification.wait();
                                        }
                                }
 
@@ -100,8 +98,9 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean>, BufferAvaila
                                }
 
                                if (bufferAndBacklog != null) {
-                                       numBuffersAvailable.decrementAndGet();
-
+                                       if (bufferAndBacklog.isMoreAvailable()) 
{
+                                               
dataAvailableNotification.set(true);
+                                       }
                                        if 
(bufferAndBacklog.buffer().isBuffer()) {
                                                
callback.onBuffer(bufferAndBacklog.buffer());
                                        } else {
@@ -128,12 +127,10 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean>, BufferAvaila
        }
 
        @Override
-       public void notifyBuffersAvailable(long numBuffers) {
-               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
-                       synchronized (numBuffersAvailable) {
-                               numBuffersAvailable.notifyAll();
-                       }
-                       ;
+       public void notifyDataAvailable() {
+               synchronized (dataAvailableNotification) {
+                       dataAvailableNotification.set(true);
+                       dataAvailableNotification.notifyAll();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index c290c67..85c676c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -202,6 +202,10 @@ public class StreamConfig implements Serializable {
                return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
        }
 
+       public boolean isFlushAlwaysEnabled() {
+               return getBufferTimeout() == 0;
+       }
+
        public void setStreamOperator(StreamOperator<?> operator) {
                if (operator != null) {
                        config.setClass(USER_FUNCTION, operator.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 45bbd66..f1cc1dc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -161,10 +161,6 @@ public class RecordWriterOutput<OUT> implements 
OperatorChain.WatermarkGaugeExpo
                recordWriter.close();
        }
 
-       public void clearBuffers() {
-               recordWriter.clearBuffers();
-       }
-
        @Override
        public Gauge<Long> getWatermarkGauge() {
                return watermarkGauge;

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 6775bc4..7c47fcf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -53,9 +53,11 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
                this(writer, channelSelector, timeout, null);
        }
 
-       public StreamRecordWriter(ResultPartitionWriter writer, 
ChannelSelector<T> channelSelector,
-                                                               long timeout, 
String taskName) {
-
+       public StreamRecordWriter(
+                       ResultPartitionWriter writer,
+                       ChannelSelector<T> channelSelector,
+                       long timeout,
+                       String taskName) {
                super(writer, channelSelector);
 
                checkArgument(timeout >= -1);
@@ -71,7 +73,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> 
extends RecordWrit
                else {
                        flushAlways = false;
                        String threadName = taskName == null ?
-                                                               
DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
+                               DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output 
Timeout Flusher - " + taskName;
 
                        outputFlusher = new OutputFlusher(threadName, timeout);
                        outputFlusher.start();
@@ -109,6 +111,7 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
         * Closes the writer. This stops the flushing thread (if there is one).
         */
        public void close() {
+               clearBuffers();
                // make sure we terminate the thread in any case
                if (outputFlusher != null) {
                        outputFlusher.terminate();

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4807c77..b99cf65 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -154,7 +154,6 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                for (RecordWriterOutput<?> output : 
this.streamOutputs) {
                                        if (output != null) {
                                                output.close();
-                                               output.clearBuffers();
                                        }
                                }
                        }
@@ -236,16 +235,8 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
         * <p>This method should never fail.
         */
        public void releaseOutputs() {
-               try {
-                       for (RecordWriterOutput<?> streamOutput : 
streamOutputs) {
-                               streamOutput.close();
-                       }
-               }
-               finally {
-                       // make sure that we release the buffers in any case
-                       for (RecordWriterOutput<?> output : streamOutputs) {
-                               output.clearBuffers();
-                       }
+               for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+                       streamOutput.close();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 43f2878..11254ef 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -37,6 +37,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -93,9 +94,9 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        inputQueues[channelIndex] = new 
ConcurrentLinkedQueue<InputValue<Object>>();
                        inputChannels[channelIndex] = new 
TestInputChannel(inputGate, i);
 
-                       final Answer<BufferAndAvailability> answer = new 
Answer<BufferAndAvailability>() {
+                       final Answer<Optional<BufferAndAvailability>> answer = 
new Answer<Optional<BufferAndAvailability>>() {
                                @Override
-                               public BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                               public Optional<BufferAndAvailability> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                        
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = 
inputQueues[channelIndex];
                                        InputValue<Object> input;
                                        boolean moreAvailable;
@@ -106,7 +107,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                        if (input != null && 
input.isStreamEnd()) {
                                                
when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
                                                        true);
-                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
moreAvailable, 0);
+                                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
moreAvailable, 0));
                                        } else if (input != null && 
input.isStreamRecord()) {
                                                Object inputElement = 
input.getStreamRecord();
 
@@ -117,15 +118,12 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                                bufferBuilder.finish();
 
                                                // Call getCurrentBuffer to 
ensure size is set
-                                               return new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0);
+                                               return Optional.of(new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
                                        } else if (input != null && 
input.isEvent()) {
                                                AbstractEvent event = 
input.getEvent();
-                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0);
+                                               return Optional.of(new 
BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
                                        } else {
-                                               synchronized (inputQueue) {
-                                                       inputQueue.wait();
-                                                       return 
answer(invocationOnMock);
-                                               }
+                                               return Optional.empty();
                                        }
                                }
                        };

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
deleted file mode 100644
index 480cfd9..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.types.LongValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the {@link StreamRecordWriter}.
- */
-public class StreamRecordWriterTest {
-
-       /**
-        * Verifies that exceptions during flush from the output flush thread 
are
-        * recognized in the writer.
-        */
-       @Test
-       public void testPropagateAsyncFlushError() {
-               FailingWriter<LongValue> testWriter = null;
-               try {
-                       ResultPartitionWriter mockResultPartitionWriter = 
getMockWriter(5);
-
-                       // test writer that flushes every 5ms and fails after 3 
flushes
-                       testWriter = new 
FailingWriter<LongValue>(mockResultPartitionWriter,
-                                       new 
RoundRobinChannelSelector<LongValue>(), 5, 3);
-
-                       try {
-                               long deadline = System.currentTimeMillis() + 
20000; // in max 20 seconds (conservative)
-                               long l = 0L;
-
-                               while (System.currentTimeMillis() < deadline) {
-                                       testWriter.emit(new LongValue(l++));
-                               }
-
-                               fail("This should have failed with an 
exception");
-                       }
-                       catch (IOException e) {
-                               assertNotNull(e.getCause());
-                               
assertTrue(e.getCause().getMessage().contains("Test Exception"));
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       if (testWriter != null) {
-                               testWriter.close();
-                       }
-               }
-       }
-
-       private static ResultPartitionWriter getMockWriter(int numPartitions) 
throws Exception {
-               BufferProvider mockProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, 4096);
-               ResultPartitionWriter mockWriter = 
mock(ResultPartitionWriter.class);
-               when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
-               
when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);
-
-               return mockWriter;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static class FailingWriter<T extends IOReadableWritable> 
extends StreamRecordWriter<T> {
-
-               private int flushesBeforeException;
-
-               private FailingWriter(ResultPartitionWriter writer, 
ChannelSelector<T> channelSelector,
-                                                               long timeout, 
int flushesBeforeException) {
-                       super(writer, channelSelector, timeout);
-                       this.flushesBeforeException = flushesBeforeException;
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       if (flushesBeforeException-- <= 0) {
-                               throw new IOException("Test Exception");
-                       }
-                       super.flush();
-               }
-       }
-}

Reply via email to