http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 3faa614..4f3a5f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; @@ -52,7 +53,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -75,23 +75,112 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { return new PipelinedSubpartition(0, parent); } + @Test(expected = IllegalStateException.class) + public void testAddTwoNonFinishedBuffer() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + availablityListener.resetNotificationCounters(); + + try { + subpartition.add(createBufferBuilder().createBufferConsumer()); + subpartition.add(createBufferBuilder().createBufferConsumer()); + assertNull(readView.getNextBuffer()); + } finally { + subpartition.release(); + } + } + + @Test + public void testAddEmptyNonFinishedBuffer() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + availablityListener.resetNotificationCounters(); + + try { + assertEquals(0, availablityListener.getNumNotifications()); + + BufferBuilder bufferBuilder = createBufferBuilder(); + subpartition.add(bufferBuilder.createBufferConsumer()); + + assertEquals(0, availablityListener.getNumNotifications()); + assertNull(readView.getNextBuffer()); + + bufferBuilder.finish(); + bufferBuilder = createBufferBuilder(); + subpartition.add(bufferBuilder.createBufferConsumer()); + + assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer. + assertNull(readView.getNextBuffer()); + assertEquals(1, subpartition.getBuffersInBacklog()); + } finally { + readView.releaseAllResources(); + subpartition.release(); + } + } + + @Test + public void testAddNonEmptyNotFinishedBuffer() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + availablityListener.resetNotificationCounters(); + + try { + assertEquals(0, availablityListener.getNumNotifications()); + + BufferBuilder bufferBuilder = createBufferBuilder(); + bufferBuilder.append(ByteBuffer.allocate(1024)); + subpartition.add(bufferBuilder.createBufferConsumer()); + + assertNextBuffer(readView, 1024, false, 1); + assertEquals(1, subpartition.getBuffersInBacklog()); + } finally { + readView.releaseAllResources(); + subpartition.release(); + } + } + + @Test + public void testMultipleEmptyBuffers() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + availablityListener.resetNotificationCounters(); + + try { + assertEquals(0, availablityListener.getNumNotifications()); + + subpartition.add(createFilledBufferConsumer(0)); + + assertEquals(1, availablityListener.getNumNotifications()); + subpartition.add(createFilledBufferConsumer(0)); + assertEquals(2, availablityListener.getNumNotifications()); + + subpartition.add(createFilledBufferConsumer(0)); + assertEquals(2, availablityListener.getNumNotifications()); + assertEquals(3, subpartition.getBuffersInBacklog()); + + subpartition.add(createFilledBufferConsumer(1024)); + assertEquals(2, availablityListener.getNumNotifications()); + + assertNextBuffer(readView, 1024, false, 0); + } finally { + readView.releaseAllResources(); + subpartition.release(); + } + } + @Test public void testIllegalReadViewRequest() throws Exception { final PipelinedSubpartition subpartition = createSubpartition(); // Successful request - assertNotNull(subpartition.createReadView(new BufferAvailabilityListener() { - @Override - public void notifyBuffersAvailable(long numBuffers) { - } - })); + assertNotNull(subpartition.createReadView(new NoOpBufferAvailablityListener())); try { - subpartition.createReadView(new BufferAvailabilityListener() { - @Override - public void notifyBuffersAvailable(long numBuffers) { - } - }); + subpartition.createReadView(new NoOpBufferAvailablityListener()); fail("Did not throw expected exception after duplicate notifyNonEmpty view request."); } catch (IllegalStateException expected) { @@ -110,7 +199,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertFalse(view.nextBufferIsEvent()); assertNull(view.getNextBuffer()); assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer() - verify(listener, times(1)).notifyBuffersAvailable(eq(0L)); + verify(listener, times(0)).notifyDataAvailable(); // Add data to the queue... subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); @@ -122,7 +211,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { // assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // ...should have resulted in a notification - verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); + verify(listener, times(1)).notifyDataAvailable(); // ...and one available result assertFalse(view.nextBufferIsEvent()); @@ -144,7 +233,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(1, subpartition.getBuffersInBacklog()); // TODO: re-enable? // assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); - verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + verify(listener, times(2)).notifyDataAvailable(); assertFalse(view.nextBufferIsEvent()); read = view.getNextBuffer(); @@ -171,7 +260,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count) // TODO: re-enable? // assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); - verify(listener, times(5)).notifyBuffersAvailable(eq(1L)); + verify(listener, times(4)).notifyDataAvailable(); assertFalse(view.nextBufferIsEvent()); // the first buffer read = view.getNextBuffer(); @@ -199,7 +288,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(5, subpartition.getTotalNumberOfBuffers()); assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); - verify(listener, times(5)).notifyBuffersAvailable(eq(1L)); + verify(listener, times(4)).notifyDataAvailable(); } @Test @@ -357,7 +446,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { // create the read view first ResultSubpartitionView view = null; if (createView) { - view = partition.createReadView(numBuffers -> {}); + view = partition.createReadView(new NoOpBufferAvailablityListener()); } partition.release();
http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index ea06dd4..9dc7bed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -167,12 +167,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // Create the read view ResultSubpartitionView readView = spy(partition - .createReadView(new BufferAvailabilityListener() { - @Override - public void notifyBuffersAvailable(long numBuffers) { - - } - })); + .createReadView(new NoOpBufferAvailablityListener())); // The released state check (of the parent) needs to be independent // of the released state of the view. @@ -223,7 +218,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); assertEquals(1, listener.getNumNotifications()); - assertEquals(5, listener.getNumNotifiedBuffers()); assertFalse(reader.nextBufferIsEvent()); // buffer BufferAndBacklog read = reader.getNextBuffer(); @@ -315,7 +309,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener); // Initial notification - assertEquals(1, listener.getNumNotifiedBuffers()); + assertEquals(1, listener.getNumNotifications()); assertFalse(bufferConsumer.isRecycled()); assertFalse(reader.nextBufferIsEvent()); @@ -325,7 +319,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(2, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); read.buffer().recycleBuffer(); - assertEquals(2, listener.getNumNotifiedBuffers()); + assertEquals(2, listener.getNumNotifications()); assertFalse(bufferConsumer.isRecycled()); assertFalse(read.nextBufferIsEvent()); @@ -338,8 +332,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { //TODO: re-enable this? // assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); - listener.awaitNotifications(5, 30_000); - assertEquals(5, listener.getNumNotifiedBuffers()); + listener.awaitNotifications(3, 30_000); + assertEquals(3, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer) read = reader.getNextBuffer(); @@ -555,7 +549,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { if (createView) { // Create a read view partition.finish(); - partition.createReadView(numBuffers -> {}); + partition.createReadView(new NoOpBufferAvailablityListener()); } // one instance of the buffers is placed in the view's nextBuffer and not released @@ -668,7 +662,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { ResultSubpartitionView view = null; if (createView) { partition.finish(); - view = partition.createReadView(numBuffers -> {}); + view = partition.createReadView(new NoOpBufferAvailablityListener()); } if (spilled) { // note: in case we create a view, one buffer will already reside in the view and @@ -706,33 +700,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // assertEquals((createView ? 4 : 0) + 2 * BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); } - private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { - - private long numNotifiedBuffers; - private long numNotifications; - - @Override - public void notifyBuffersAvailable(long numBuffers) { - numNotifiedBuffers += numBuffers; - ++numNotifications; - } - - long getNumNotifiedBuffers() { - return numNotifiedBuffers; - } - - public long getNumNotifications() { - return numNotifications; - } - - void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) throws InterruptedException { - long deadline = System.currentTimeMillis() + timeoutMillis; - while (numNotifiedBuffers < awaitedNumNotifiedBuffers && System.currentTimeMillis() < deadline) { - Thread.sleep(1); - } - } - } - /** * An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its * {@link #createBufferFileWriter(FileIOChannel.ID)} method. http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 9b8bd54..48846b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -24,6 +24,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.io.IOException; + import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -127,4 +129,15 @@ public abstract class SubpartitionTestBase extends TestLogger { // Verify that parent release is reflected at partition view assertTrue(view.isReleased()); } + + protected void assertNextBuffer( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog) throws IOException, InterruptedException { + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); + assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); + assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); + assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index cd75a7b..abadddf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.junit.Test; import java.io.IOException; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -127,8 +128,8 @@ public class InputChannelTest { } @Override - BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { - return null; + Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException { + return Optional.empty(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index ef30ee1..9de2bbe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.MutableObjectIterator; @@ -32,6 +33,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.Optional; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; @@ -64,12 +66,12 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e // The input iterator can produce an infinite stream. That's why we have to serialize each // record on demand and cannot do it upfront. - final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() { + final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() { private boolean hasData = inputIterator.next(reuse) != null; @Override - public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { + public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable { if (hasData) { serializer.clear(); BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); @@ -79,11 +81,11 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e hasData = inputIterator.next(reuse) != null; // Call getCurrentBuffer to ensure size is set - return new InputChannel.BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0); + return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0)); } else { when(inputChannel.getInputChannel().isReleased()).thenReturn(true); - return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); + return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0)); } } }; http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index ab276cd..d5c2492 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -64,6 +64,7 @@ import scala.Tuple2; import static org.apache.flink.util.FutureUtil.waitForAll; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -361,9 +362,9 @@ public class LocalInputChannelTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager.createSubpartitionView( - any(ResultPartitionID.class), - anyInt(), - any(BufferAvailabilityListener.class))).thenReturn(reader); + any(ResultPartitionID.class), + anyInt(), + any(BufferAvailabilityListener.class))).thenReturn(reader); LocalInputChannel channel = new LocalInputChannel( gate, @@ -379,11 +380,7 @@ public class LocalInputChannelTest { when(reader.getNextBuffer()).thenReturn(null); when(reader.isReleased()).thenReturn(false); - try { - channel.getNextBuffer(); - fail("Did not throw expected IllegalStateException"); - } catch (IllegalStateException ignored) { - } + assertFalse(channel.getNextBuffer().isPresent()); // Null buffer and released when(reader.getNextBuffer()).thenReturn(null); @@ -394,6 +391,9 @@ public class LocalInputChannelTest { fail("Did not throw expected CancelTaskException"); } catch (CancelTaskException ignored) { } + + channel.releaseAllResources(); + assertFalse(channel.getNextBuffer().isPresent()); } // --------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 17425f2..0dd0875 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -126,7 +126,7 @@ public class SingleInputGateTest { final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class); when(iterator.getNextBuffer()).thenReturn( - new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), 0, false)); + new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false,0, false)); final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager.createSubpartitionView( http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java index 43ac7a1..f9060f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -28,6 +29,7 @@ import org.mockito.stubbing.Answer; import org.mockito.stubbing.OngoingStubbing; import java.io.IOException; +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -44,7 +46,7 @@ public class TestInputChannel { private final SingleInputGate inputGate; // Abusing Mockito here... ;) - protected OngoingStubbing<InputChannel.BufferAndAvailability> stubbing; + protected OngoingStubbing<Optional<BufferAndAvailability>> stubbing; public TestInputChannel(SingleInputGate inputGate, int channelIndex) { checkArgument(channelIndex >= 0); @@ -55,9 +57,9 @@ public class TestInputChannel { public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException { if (stubbing == null) { - stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true, 0)); + stubbing = when(mock.getNextBuffer()).thenReturn(Optional.of(new BufferAndAvailability(buffer, true, 0))); } else { - stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true, 0)); + stubbing = stubbing.thenReturn(Optional.of(new BufferAndAvailability(buffer, true, 0))); } return this; @@ -71,13 +73,13 @@ public class TestInputChannel { } public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException { - final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() { + final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() { @Override - public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { + public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable { // Return true after finishing when(mock.isReleased()).thenReturn(true); - return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); + return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0)); } }; http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java index 2c6ee50..c3a4a32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java @@ -29,7 +29,7 @@ import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,7 +62,7 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila /** Random source for sleeps. */ private final Random random; - private final AtomicLong numBuffersAvailable = new AtomicLong(); + private final AtomicBoolean dataAvailableNotification = new AtomicBoolean(false); public TestSubpartitionConsumer( boolean isSlowConsumer, @@ -85,11 +85,9 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila throw new InterruptedException(); } - if (numBuffersAvailable.get() == 0) { - synchronized (numBuffersAvailable) { - while (numBuffersAvailable.get() == 0) { - numBuffersAvailable.wait(); - } + synchronized (dataAvailableNotification) { + while (!dataAvailableNotification.getAndSet(false)) { + dataAvailableNotification.wait(); } } @@ -100,8 +98,9 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila } if (bufferAndBacklog != null) { - numBuffersAvailable.decrementAndGet(); - + if (bufferAndBacklog.isMoreAvailable()) { + dataAvailableNotification.set(true); + } if (bufferAndBacklog.buffer().isBuffer()) { callback.onBuffer(bufferAndBacklog.buffer()); } else { @@ -128,12 +127,10 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila } @Override - public void notifyBuffersAvailable(long numBuffers) { - if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { - synchronized (numBuffersAvailable) { - numBuffersAvailable.notifyAll(); - } - ; + public void notifyDataAvailable() { + synchronized (dataAvailableNotification) { + dataAvailableNotification.set(true); + dataAvailableNotification.notifyAll(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index c290c67..85c676c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -202,6 +202,10 @@ public class StreamConfig implements Serializable { return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT); } + public boolean isFlushAlwaysEnabled() { + return getBufferTimeout() == 0; + } + public void setStreamOperator(StreamOperator<?> operator) { if (operator != null) { config.setClass(USER_FUNCTION, operator.getClass()); http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 45bbd66..f1cc1dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -161,10 +161,6 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo recordWriter.close(); } - public void clearBuffers() { - recordWriter.clearBuffers(); - } - @Override public Gauge<Long> getWatermarkGauge() { return watermarkGauge; http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java index 6775bc4..7c47fcf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java @@ -53,9 +53,11 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit this(writer, channelSelector, timeout, null); } - public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, - long timeout, String taskName) { - + public StreamRecordWriter( + ResultPartitionWriter writer, + ChannelSelector<T> channelSelector, + long timeout, + String taskName) { super(writer, channelSelector); checkArgument(timeout >= -1); @@ -71,7 +73,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit else { flushAlways = false; String threadName = taskName == null ? - DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName; + DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName; outputFlusher = new OutputFlusher(threadName, timeout); outputFlusher.start(); @@ -109,6 +111,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit * Closes the writer. This stops the flushing thread (if there is one). */ public void close() { + clearBuffers(); // make sure we terminate the thread in any case if (outputFlusher != null) { outputFlusher.terminate(); http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 4807c77..b99cf65 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -154,7 +154,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea for (RecordWriterOutput<?> output : this.streamOutputs) { if (output != null) { output.close(); - output.clearBuffers(); } } } @@ -236,16 +235,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea * <p>This method should never fail. */ public void releaseOutputs() { - try { - for (RecordWriterOutput<?> streamOutput : streamOutputs) { - streamOutput.close(); - } - } - finally { - // make sure that we release the buffers in any case - for (RecordWriterOutput<?> output : streamOutputs) { - output.clearBuffers(); - } + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 43f2878..11254ef 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -37,6 +37,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; @@ -93,9 +94,9 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>(); inputChannels[channelIndex] = new TestInputChannel(inputGate, i); - final Answer<BufferAndAvailability> answer = new Answer<BufferAndAvailability>() { + final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() { @Override - public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { + public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable { ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex]; InputValue<Object> input; boolean moreAvailable; @@ -106,7 +107,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { if (input != null && input.isStreamEnd()) { when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn( true); - return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0); + return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0)); } else if (input != null && input.isStreamRecord()) { Object inputElement = input.getStreamRecord(); @@ -117,15 +118,12 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { bufferBuilder.finish(); // Call getCurrentBuffer to ensure size is set - return new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0); + return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0)); } else if (input != null && input.isEvent()) { AbstractEvent event = input.getEvent(); - return new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0); + return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0)); } else { - synchronized (inputQueue) { - inputQueue.wait(); - return answer(invocationOnMock); - } + return Optional.empty(); } } }; http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java deleted file mode 100644 index 480cfd9..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; -import org.apache.flink.types.LongValue; - -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for the {@link StreamRecordWriter}. - */ -public class StreamRecordWriterTest { - - /** - * Verifies that exceptions during flush from the output flush thread are - * recognized in the writer. - */ - @Test - public void testPropagateAsyncFlushError() { - FailingWriter<LongValue> testWriter = null; - try { - ResultPartitionWriter mockResultPartitionWriter = getMockWriter(5); - - // test writer that flushes every 5ms and fails after 3 flushes - testWriter = new FailingWriter<LongValue>(mockResultPartitionWriter, - new RoundRobinChannelSelector<LongValue>(), 5, 3); - - try { - long deadline = System.currentTimeMillis() + 20000; // in max 20 seconds (conservative) - long l = 0L; - - while (System.currentTimeMillis() < deadline) { - testWriter.emit(new LongValue(l++)); - } - - fail("This should have failed with an exception"); - } - catch (IOException e) { - assertNotNull(e.getCause()); - assertTrue(e.getCause().getMessage().contains("Test Exception")); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (testWriter != null) { - testWriter.close(); - } - } - } - - private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception { - BufferProvider mockProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, 4096); - ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class); - when(mockWriter.getBufferProvider()).thenReturn(mockProvider); - when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions); - - return mockWriter; - } - - // ------------------------------------------------------------------------ - - private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T> { - - private int flushesBeforeException; - - private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, - long timeout, int flushesBeforeException) { - super(writer, channelSelector, timeout); - this.flushesBeforeException = flushesBeforeException; - } - - @Override - public void flush() throws IOException { - if (flushesBeforeException-- <= 0) { - throw new IOException("Test Exception"); - } - super.flush(); - } - } -}