[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5581 ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r171210203 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -182,7 +182,7 @@ private static void assertNextBufferOrEvent( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsBuffer, - Class expectedEventClass, + @Nullable Class expectedEventClass, --- End diff -- I added the recycling in case of failures but kept the `@Nullable` because otherwise this would need some more duplication with this cleanup ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r171181261 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -182,7 +182,7 @@ private static void assertNextBufferOrEvent( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsBuffer, - Class expectedEventClass, + @Nullable Class expectedEventClass, --- End diff -- I would prefer for this method to return `Buffer` and: ``` void assertNextBuffer(...) { Buffer buffer = assertNextBufferOrEvent(...); assertTrue(buffer.isBuffer()); buffer.recycleBuffer(); } void assertNextEvent(...) { Buffer buffer = assertNextBufferOrEvent(...); assertFalse(buffer.isBuffer()); assertThat(EventSerializer.fromBuffer(buffer, ...), ...); buffer.recycleBuffer(); } ``` btw, you are not recycling the buffer in case of failure ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170993496 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, true); --- End diff -- almost - it remains `@Nullable` in `SubpartitionTestBase#assertNextBufferOrEvent` ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170990471 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -138,11 +145,68 @@ static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + @Nullable Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + false, + expectedEventClass, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + private static void assertNextBufferOrEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsBuffer, + Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + checkArgument(expectedEventClass == null || !expectedIsBuffer); + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); - assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); - assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); - assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertNotNull(bufferAndBacklog); + + assertEquals("buffer size", expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); --- End diff -- Unfortunately yes: - with the string: ``` java.lang.AssertionError: buffer size Expected :1025 Actual :1024 ``` - without the string: ``` java.lang.AssertionError: Expected :1025 Actual :1024 ``` It is even worse for boolean values as you may imagine. This way, you can immediately get to your test to fix the assumption and do not have to click into `SubpartitionTestBase` to identify what was actually wrong. ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170921245 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -332,56 +290,41 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception // only updated when getting/spilling the buffers but without the nextBuffer (kept in memory) assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes()); + // wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!) listener.awaitNotifications(2, 30_000); assertEquals(2, listener.getNumNotifications()); + // after consuming and releasing the next buffer, the bufferConsumer may be freed, + // depending on the timing of the last write operation + // -> retain once so that we can check below + Buffer buffer = bufferConsumer.build(); + buffer.retainBuffer(); + assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer) - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false); assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - // now the bufferConsumer may be freed, depending on the timing of the write operation - // -> let's do this check at the end of the test (to save some time) - assertTrue(read.nextBufferIsEvent()); + + bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!) assertTrue(reader.nextBufferIsEvent()); // the event (spilled) - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, true); --- End diff -- ditto ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170921214 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, true); --- End diff -- Inline 194, instead of creating dummy event add some real event: `partition.add(EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1)));` That will allow you to drop the `@Nullable` field. ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170921349 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -138,11 +145,68 @@ static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + @Nullable Class expectedEventClass, --- End diff -- Before-mentioned `@Nullable` ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170917769 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -138,11 +145,68 @@ static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + @Nullable Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + false, + expectedEventClass, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + private static void assertNextBufferOrEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsBuffer, + Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + checkArgument(expectedEventClass == null || !expectedIsBuffer); + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); - assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); - assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); - assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertNotNull(bufferAndBacklog); + + assertEquals("buffer size", expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); --- End diff -- Do those string error messages add any value besides chatter in this method? ---
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5581 [FLINK-8755][FLINK-8786][network] fix two bugs in spilled and spillable subpartition views ## What is the purpose of the change 1) `SpilledSubpartitionView#getNextBuffer()` relies on the backlog to signal further data availability. However, if there are only events left in the buffer queue, their buffers are not included in the backlog counting and therefore, `isMoreAvailable` will be wrongly false here. 2) When processing the last in-memory buffer in `SpillableSubpartitionView#getNextBuffer`, we always set the `isMoreAvailable` flag of the returned `BufferAndBacklog` to false irrespective of what may be in the spill writer. This PR fixes both issues and heavily extends the unit tests in this regard, hence the two were combined in a single PR. Please also note that this PR is built upon #5549, #5550, #5551, and #5572 to reduce possible merge conflicts - everything starting after FLINK-8694 is new. ## Brief change log - rename `RecordWriter#closeBufferConsumer()` to `closeBufferBuilder()` (internal method, we switched to buffer builders a while ago) - make `AwaitableBufferAvailablityListener` (used by tests only) thread-safe - fix `SpilledSubpartitionView#getNextBuffer()` to not only rely on the backlog - fix `SpillableSubpartitionView#getNextBuffer()` returning wrong `isMoreAvailable` when processing the last in-memory buffer - extended overall subpartition tests to also verify several other flags that were added in the past but not covered appropriately, e.g. `BufferAndBacklog#isMoreAvailable()` or `ResultSubpartitionView#isAvailable()` - some minor code and documentation improvements (more details in the individual commits) ## Verifying this change This change added tests and can be verified as follows: - added several checks to `PipelinedSubpartitionTest` and `SpillableSubpartitionTest` via helper methods in `SubpartitionTestBase` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8786 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5581.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5581 commit 40e18c85563e1ef45ce89709fa3aa7613439e12d Author: Nico KruberDate: 2018-02-20T17:04:12Z [FLINK-8733][network] fix SpillableSubpartition#spillFinishedBufferConsumers() not counting spilled bytes commit 2721cabce0dd2be4bb4da4097ff4e6c7749498c1 Author: Nico Kruber Date: 2018-02-20T17:05:54Z [FLINK-8734][network] fix partition bytes counting and re-enable in tests commit 375de6118a5d84d21b40b7c23438d09204ad664b Author: Nico Kruber Date: 2018-02-20T17:06:41Z [hotfix][network] remove PowerMockRunner from RecordWriterTest commit d0d3c7b026d5af3a57c47892501ab0e74e7172b2 Author: Nico Kruber Date: 2018-02-20T17:07:02Z [hotfix][network] various minor improvements commit dbcfe73c41618c70e16884f2c723fc9a6a9dca4f Author: Nico Kruber Date: 2018-02-21T15:30:53Z [hotfix][network] initialize SingleInputGate#enqueuedInputChannelsWithData with the right size commit 13f2e09240b9efb8163bb93dad52486fc2af65ac Author: Nico Kruber Date: 2018-02-21T16:09:31Z [FLINK-8736][network] fix memory segment offsets for slices of slices being wrong commit f8363154ef2e03b99a471f627028ad50fc1271ab Author: Nico Kruber Date: 2018-02-23T17:07:31Z fixup! [hotfix][network] various minor improvements commit 8cf861f06ddc9c79fc61407ebe426213d1740ef7 Author: Piotr Nowojski Date: 2018-02-23T10:20:21Z [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks. commit c7cda5463e7bba1d2f3f62006f6e4a71246efccb Author: Piotr Nowojski