[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-03-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5581


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r171210203
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -182,7 +182,7 @@ private static void assertNextBufferOrEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsBuffer,
-   Class expectedEventClass,
+   @Nullable Class 
expectedEventClass,
--- End diff --

I added the recycling in case of failures but kept the `@Nullable` because 
otherwise this would need some more duplication with this cleanup


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-28 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r171181261
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -182,7 +182,7 @@ private static void assertNextBufferOrEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsBuffer,
-   Class expectedEventClass,
+   @Nullable Class 
expectedEventClass,
--- End diff --

I would prefer for this method to return `Buffer` and:
```
void assertNextBuffer(...) {
  Buffer buffer = assertNextBufferOrEvent(...);
  assertTrue(buffer.isBuffer());
  buffer.recycleBuffer();
}

void assertNextEvent(...) {
  Buffer buffer = assertNextBufferOrEvent(...);
  assertFalse(buffer.isBuffer());
  assertThat(EventSerializer.fromBuffer(buffer, ...), ...);
  buffer.recycleBuffer();
}
```

btw, you are not recycling the buffer in case of failure


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170993496
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws 
Exception {
assertEquals(1, listener.getNumNotifications());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   BufferAndBacklog read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
assertEquals(2, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertFalse(read.nextBufferIsEvent());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertTrue(read.nextBufferIsEvent());
 
assertTrue(reader.nextBufferIsEvent()); // event
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertFalse(read.buffer().isBuffer());
+   assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, 
true);
--- End diff --

almost - it remains `@Nullable` in 
`SubpartitionTestBase#assertNextBufferOrEvent`


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170990471
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -138,11 +145,68 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
-   int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   true,
+   null,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   static void assertNextEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   @Nullable Class 
expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   false,
+   expectedEventClass,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   private static void assertNextBufferOrEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   boolean expectedIsBuffer,
+   Class expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
readView.getNextBuffer();
-   assertEquals(expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
-   assertEquals(expectedIsMoreAvailable, 
bufferAndBacklog.isMoreAvailable());
-   assertEquals(expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+   assertNotNull(bufferAndBacklog);
+
+   assertEquals("buffer size", expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
--- End diff --

Unfortunately yes:
- with the string:
```
java.lang.AssertionError: buffer size 
Expected :1025
Actual   :1024
```
- without the string:
```
java.lang.AssertionError: 
Expected :1025
Actual   :1024
```
It is even worse for boolean values as you may imagine. This way, you can 
immediately get to your test to fix the assumption and do not have to click 
into `SubpartitionTestBase` to identify what was actually wrong.



---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170921245
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -332,56 +290,41 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
// only updated when getting/spilling the buffers but without 
the nextBuffer (kept in memory)
assertEquals(BUFFER_DATA_SIZE * 3 + 4, 
partition.getTotalNumberOfBytes());
 
+   // wait for successfully spilling all buffers (before that we 
may not access any spilled buffer and cannot rely on isMoreAvailable!)
listener.awaitNotifications(2, 30_000);
assertEquals(2, listener.getNumNotifications());
 
+   // after consuming and releasing the next buffer, the 
bufferConsumer may be freed,
+   // depending on the timing of the last write operation
+   // -> retain once so that we can check below
+   Buffer buffer = bufferConsumer.build();
+   buffer.retainBuffer();
+
assertFalse(reader.nextBufferIsEvent()); // second buffer 
(retained in SpillableSubpartition#nextBuffer)
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, 
false);
assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
assertEquals(1, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   read.buffer().recycleBuffer();
-   // now the bufferConsumer may be freed, depending on the timing 
of the write operation
-   // -> let's do this check at the end of the test (to save some 
time)
-   assertTrue(read.nextBufferIsEvent());
+
+   bufferConsumer.close(); // recycle the retained buffer from 
above (should be the last reference!)
 
assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertFalse(read.buffer().isBuffer());
+   assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, 
true);
--- End diff --

ditto


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170921214
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws 
Exception {
assertEquals(1, listener.getNumNotifications());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   BufferAndBacklog read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
assertEquals(2, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertFalse(read.nextBufferIsEvent());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertTrue(read.nextBufferIsEvent());
 
assertTrue(reader.nextBufferIsEvent()); // event
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertFalse(read.buffer().isBuffer());
+   assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, 
true);
--- End diff --

Inline 194, instead of creating dummy event add some real event: 
`partition.add(EventSerializer.toBufferConsumer(new 
CancelCheckpointMarker(1)));`

That will allow you to drop the `@Nullable` field.



---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170921349
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -138,11 +145,68 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
-   int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   true,
+   null,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   static void assertNextEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   @Nullable Class 
expectedEventClass,
--- End diff --

Before-mentioned `@Nullable`


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170917769
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -138,11 +145,68 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
-   int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   true,
+   null,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   static void assertNextEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   @Nullable Class 
expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   false,
+   expectedEventClass,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   private static void assertNextBufferOrEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   boolean expectedIsBuffer,
+   Class expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
readView.getNextBuffer();
-   assertEquals(expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
-   assertEquals(expectedIsMoreAvailable, 
bufferAndBacklog.isMoreAvailable());
-   assertEquals(expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+   assertNotNull(bufferAndBacklog);
+
+   assertEquals("buffer size", expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
--- End diff --

Do those string error messages add any value besides chatter in this 
method? 


---


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-26 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5581

[FLINK-8755][FLINK-8786][network] fix two bugs in spilled and spillable 
subpartition views

## What is the purpose of the change

1) `SpilledSubpartitionView#getNextBuffer()` relies on the backlog to 
signal further data availability. However, if there are only events left in the 
buffer queue, their buffers are not included in the backlog counting and 
therefore, `isMoreAvailable` will be wrongly false here.
2) When processing the last in-memory buffer in 
`SpillableSubpartitionView#getNextBuffer`, we always set the `isMoreAvailable` 
flag of the returned `BufferAndBacklog` to false irrespective of what may be in 
the spill writer.

This PR fixes both issues and heavily extends the unit tests in this 
regard, hence the two were combined in a single PR. Please also note that this 
PR is built upon #5549, #5550, #5551, and #5572 to reduce possible merge 
conflicts - everything starting after FLINK-8694 is new.

## Brief change log

- rename `RecordWriter#closeBufferConsumer()` to `closeBufferBuilder()` 
(internal method, we switched to buffer builders a while ago)
- make `AwaitableBufferAvailablityListener` (used by tests only) thread-safe
- fix `SpilledSubpartitionView#getNextBuffer()` to not only rely on the 
backlog
- fix `SpillableSubpartitionView#getNextBuffer()` returning wrong 
`isMoreAvailable` when processing the last in-memory buffer
- extended overall subpartition tests to also verify several other flags 
that were added in the past but not covered appropriately, e.g. 
`BufferAndBacklog#isMoreAvailable()` or `ResultSubpartitionView#isAvailable()`
- some minor code and documentation improvements

(more details in the individual commits)

## Verifying this change

This change added tests and can be verified as follows:
- added several checks to `PipelinedSubpartitionTest` and 
`SpillableSubpartitionTest` via helper methods in `SubpartitionTestBase`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8786

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5581.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5581


commit 40e18c85563e1ef45ce89709fa3aa7613439e12d
Author: Nico Kruber 
Date:   2018-02-20T17:04:12Z

[FLINK-8733][network] fix 
SpillableSubpartition#spillFinishedBufferConsumers() not counting spilled bytes

commit 2721cabce0dd2be4bb4da4097ff4e6c7749498c1
Author: Nico Kruber 
Date:   2018-02-20T17:05:54Z

[FLINK-8734][network] fix partition bytes counting and re-enable in tests

commit 375de6118a5d84d21b40b7c23438d09204ad664b
Author: Nico Kruber 
Date:   2018-02-20T17:06:41Z

[hotfix][network] remove PowerMockRunner from RecordWriterTest

commit d0d3c7b026d5af3a57c47892501ab0e74e7172b2
Author: Nico Kruber 
Date:   2018-02-20T17:07:02Z

[hotfix][network] various minor improvements

commit dbcfe73c41618c70e16884f2c723fc9a6a9dca4f
Author: Nico Kruber 
Date:   2018-02-21T15:30:53Z

[hotfix][network] initialize SingleInputGate#enqueuedInputChannelsWithData 
with the right size

commit 13f2e09240b9efb8163bb93dad52486fc2af65ac
Author: Nico Kruber 
Date:   2018-02-21T16:09:31Z

[FLINK-8736][network] fix memory segment offsets for slices of slices being 
wrong

commit f8363154ef2e03b99a471f627028ad50fc1271ab
Author: Nico Kruber 
Date:   2018-02-23T17:07:31Z

fixup! [hotfix][network] various minor improvements

commit 8cf861f06ddc9c79fc61407ebe426213d1740ef7
Author: Piotr Nowojski 
Date:   2018-02-23T10:20:21Z

[FLINK-8760][runtime] Correctly propagate moreAvailable flag through 
SingleInputGate

Previously if we SingleInputGate was re-eqnqueuing an input channel, 
isMoreAvailable
might incorrectly return false. This might caused some dead locks.

commit c7cda5463e7bba1d2f3f62006f6e4a71246efccb
Author: Piotr Nowojski