[GitHub] flink pull request #6417: [FLINK-9913][runtime] Improve output serialization...

2018-07-25 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/6417

[FLINK-9913][runtime] Improve output serialization only once in RecordWriter

## What is the purpose of the change

*This pull request improves the output serialization only once for multi 
target channels in `RecordWriter`, rather than serialization as many times as 
the number of selected channels.

## Brief change log

  - *Only one `RecordSerializer` is created for all the output channels in 
`RecordWriter`*
  - *Restructure the processes of `emit`, `broadcastEmit`, randomEmit` in 
`RecordWriter`*
  - *Restructure the interface methods in `RecordSerializer`*

## Verifying this change

This change is already covered by existing tests, such as 
*SpanningRecordSerializationTest*, etc.

And adds new tests in `RecordWriterTest` to verify:

  - *The serialization results are correct by `RecordWriter#emit` with 
`BroadcastPartitioner`*
  - *The serialization results are correct by `RecordWriter#broadcastEmit` 
directly*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-9913

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6417


commit 109ddb37abafcea28478b90cda10b965e0c399d5
Author: Zhijiang 
Date:   2018-07-25T05:45:23Z

[FLINK-9913][runtime] Improve output serialization only once in RecordWriter




---


[GitHub] flink issue #5381: [FLINK-8523][network] Stop assigning floating buffers for...

2018-07-11 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
Yes, I think it can make better use of floating buffers.

As last confirmation with Piotr, this PR will not be merged into 
release-1.5, because there were still some issues not confirmed before. So I 
pended this PR and did not update it temporarily.

What do you think of this PR issue? I can continue on it if necessary.


---


[GitHub] flink pull request #6254: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6254#discussion_r201293862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -594,22 +626,22 @@ public String getMessage() {
}
 
/**
-* Adds an exclusive buffer (back) into the queue and recycles 
one floating buffer if the
+* Adds an exclusive buffer (back) into the queue and removes 
one floating buffer if the
 * number of available buffers in queue is more than the 
required amount.
 *
 * @param buffer The exclusive buffer to add
 * @param numRequiredBuffers The number of required buffers
 *
-* @return How many buffers were added to the queue
+* @return How many buffers were added to the queue (0 
or 1) and the
+* floating buffer which was removed and should be 
released (outside!)
 */
-   int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
+   Tuple2> addExclusiveBuffer(Buffer 
buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
--- End diff --

yes, this is not critical. Just thought of it to exchange with you! :)


---


[GitHub] flink pull request #6254: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6254#discussion_r201293052
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -479,6 +508,9 @@ void onSenderBacklog(int backlog) throws IOException {
 
numRequiredBuffers = backlog + initialCredit;
while (bufferQueue.getAvailableBufferSize() < 
numRequiredBuffers && !isWaitingForFloatingBuffers) {
+   // TODO: this will take a lock in the 
LocalBufferPool as well and needs to be done
+   // outside the synchronized block (which is a 
bit difficult trying to acquire the
+   // lock only once!
Buffer buffer = 
inputGate.getBufferPool().requestBuffer();
--- End diff --

Agree with your suggestion.


---


[GitHub] flink issue #6272: [FLINK-9755][network] forward exceptions in RemoteInputCh...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6272
  
LGTM!


---


[GitHub] flink pull request #6272: [FLINK-9755][network] forward exceptions in Remote...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6272#discussion_r201292263
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
return false;
}
 
-   boolean needMoreBuffers = false;
-   synchronized (bufferQueue) {
-   checkState(isWaitingForFloatingBuffers, "This channel 
should be waiting for floating buffers.");
+   boolean recycleBuffer = true;
+   try {
+   boolean needMoreBuffers = false;
+   synchronized (bufferQueue) {
+   checkState(isWaitingForFloatingBuffers,
+   "This channel should be waiting for 
floating buffers.");
+
+   // Important: double check the isReleased state 
inside synchronized block, so there is no
+   // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
+   if (isReleased.get() ||
+   bufferQueue.getAvailableBufferSize() >= 
numRequiredBuffers) {
+   isWaitingForFloatingBuffers = false;
+   buffer.recycleBuffer();
+   return false;
+   }
 
-   // Important: double check the isReleased state inside 
synchronized block, so there is no
-   // race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
-   isWaitingForFloatingBuffers = false;
-   buffer.recycleBuffer();
-   return false;
-   }
+   // note: this call may fail, for better 
cleanup, increase the counter first
+   if (unannouncedCredit.getAndAdd(1) == 0) {
+   notifyCreditAvailable();
--- End diff --

👍


---


[GitHub] flink issue #6272: [FLINK-9755][network] forward exceptions in RemoteInputCh...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6272
  
Thanks for fixing this potential bug. 
It makes sense to handle exception during `notifyBufferAvailable` on 
listener side. Just some thoughts on my side above. :)


---


[GitHub] flink pull request #6272: [FLINK-9755][network] forward exceptions in Remote...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6272#discussion_r201247182
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -687,7 +690,65 @@ public void testFairDistributionFloatingBuffers() 
throws Exception {
} catch (Throwable t) {
thrown = t;
} finally {
-   cleanup(networkBufferPool, null, thrown, channel1, 
channel2, channel3);
+   cleanup(networkBufferPool, null, null, thrown, 
channel1, channel2, channel3);
+   }
+   }
+
+   /**
+* Tests that failures are propagated correctly if
+* {@link RemoteInputChannel#notifyBufferAvailable(Buffer)} throws an 
exception. Also tests that
+* a second listener will be notified in this case.
+*/
+   @Test
+   public void testFailureInNotifyBufferAvailable() throws Exception {
+   // Setup
+   final int numExclusiveBuffers = 0;
+   final int numFloatingBuffers = 1;
+   final int numTotalBuffers = numExclusiveBuffers + 
numFloatingBuffers;
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(
+   numTotalBuffers, 32);
+
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel successfulRemoteIC = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), 
successfulRemoteIC);
+
+// inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
--- End diff --

remove this annotated code?


---


[GitHub] flink pull request #6272: [FLINK-9755][network] forward exceptions in Remote...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6272#discussion_r201242662
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
return false;
}
 
-   boolean needMoreBuffers = false;
-   synchronized (bufferQueue) {
-   checkState(isWaitingForFloatingBuffers, "This channel 
should be waiting for floating buffers.");
+   boolean recycleBuffer = true;
+   try {
+   boolean needMoreBuffers = false;
+   synchronized (bufferQueue) {
+   checkState(isWaitingForFloatingBuffers,
+   "This channel should be waiting for 
floating buffers.");
+
+   // Important: double check the isReleased state 
inside synchronized block, so there is no
+   // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
+   if (isReleased.get() ||
+   bufferQueue.getAvailableBufferSize() >= 
numRequiredBuffers) {
+   isWaitingForFloatingBuffers = false;
+   buffer.recycleBuffer();
+   return false;
+   }
 
-   // Important: double check the isReleased state inside 
synchronized block, so there is no
-   // race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
-   isWaitingForFloatingBuffers = false;
-   buffer.recycleBuffer();
-   return false;
-   }
+   // note: this call may fail, for better 
cleanup, increase the counter first
+   if (unannouncedCredit.getAndAdd(1) == 0) {
+   notifyCreditAvailable();
--- End diff --

From failure point, `notifyCreditAvailable` should be called before 
`bufferQueue.addFloatingBuffer`. But from another point,  it seems more strict 
in logic to confirm buffer ready before announcing credit, otherwise the 
channel may receive new data before queuing this floating buffer, although it 
can hardly happen based on current implementation.

Another concern is that `notifyCreditAvailable` itself is thread safe. But 
considering handling failure easily, we place it under `synchronized` part. 
Currently the process of `notifyCreditAvailable` is very lightweight, so the 
cost can be ignored.

Maybe the above two concerns are unnecessary, and I can accept the current 
modifications.


---


[GitHub] flink issue #6271: [FLINK-9766][network][tests] fix cleanup in RemoteInputCh...

2018-07-10 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6271
  
LGTM!


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

👍


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

👍


---


[GitHub] flink issue #6257: [FLINK-9676][network] clarify contracts of BufferListener...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6257
  
Thanks for fixing this bug, and we also solve this problem in this way. 

This solution seems more lightweight than the way in 
[6254](https://github.com/apache/flink/pull/6254), and I also think the lock 
adjusting in `6254` has reference values.


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200252253
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

I am wondering whether we should rethrow this exception under below 
handling in the end.

For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag 
of  `isWaitingForFloatingBuffers` is not consistent, we should throw this 
exception to trigger failover, otherwise we can not find the potential bug.


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200251500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

Currently `notifyBufferDestroyed` will do nothing, and we should be careful 
if implement this method future similar with `notifyBufferAvailable`.


---


[GitHub] flink pull request #6254: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6254#discussion_r200245464
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -594,22 +626,22 @@ public String getMessage() {
}
 
/**
-* Adds an exclusive buffer (back) into the queue and recycles 
one floating buffer if the
+* Adds an exclusive buffer (back) into the queue and removes 
one floating buffer if the
 * number of available buffers in queue is more than the 
required amount.
 *
 * @param buffer The exclusive buffer to add
 * @param numRequiredBuffers The number of required buffers
 *
-* @return How many buffers were added to the queue
+* @return How many buffers were added to the queue (0 
or 1) and the
+* floating buffer which was removed and should be 
released (outside!)
 */
-   int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
+   Tuple2> addExclusiveBuffer(Buffer 
buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
--- End diff --

For more strict, `getAvailableBufferSize() > numRequiredBuffers` should be 
`getAvailableBufferSize() == numRequiredBuffers + 1`? 
Otherwise, it may be understood the number of available buffers may exceed 
more than required buffers, but we only return one floating buffer each time. 
Actually it can only more than one in our design. Maybe it is out of this jira, 
and you can ignore it.


---


[GitHub] flink pull request #6254: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6254#discussion_r200247127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -479,6 +508,9 @@ void onSenderBacklog(int backlog) throws IOException {
 
numRequiredBuffers = backlog + initialCredit;
while (bufferQueue.getAvailableBufferSize() < 
numRequiredBuffers && !isWaitingForFloatingBuffers) {
+   // TODO: this will take a lock in the 
LocalBufferPool as well and needs to be done
+   // outside the synchronized block (which is a 
bit difficult trying to acquire the
+   // lock only once!
Buffer buffer = 
inputGate.getBufferPool().requestBuffer();
--- End diff --

In my previous implementation, I added a new method in `BufferProvider` 
interface to request batch of buffers at a time.  To do so, it only needs lock 
only once in `LocalBufferPool` and may be more efficient from lock side. The 
only concern is that the floating buffer distribution may be not fair in all 
the channels. So we want to implement two strategies, one fair strategy is 
requesting one buffer at a time, another greedy strategy is requesting all 
required buffers at a time, then comparing these strategies to check 
performance. Maybe it behaves different in different scenarios. I am planing to 
submit this JIRA soon. What do you think?


---


[GitHub] flink issue #6238: [FLINK-9636][network] fix inconsistency with failed buffe...

2018-07-03 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6238
  
👍


---


[GitHub] flink pull request #6238: [FLINK-9636][network] fix inconsistency with faile...

2018-07-02 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6238#discussion_r199676237
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   redistributeBuffers();
+   try {
+   redistributeBuffers();
+   } catch (Throwable t) {
+   this.numTotalRequiredBuffers -= 
numRequiredBuffers;
+   ExceptionUtils.rethrowIOException(t);
+   }
}
 
final List segments = new 
ArrayList<>(numRequiredBuffers);
--- End diff --

The following `availableMemorySegments.poll(2, TimeUnit.SECONDS) ` may 
cause `InterruptedException`, and in the catch part 
`recycleMemorySegments(segments)`  it will do `numTotalRequiredBuffers -= 
segments.size();`.

I think we should do `recycleMemorySegments(numRequiredBuffers ,segments)`, 
and then call `numTotalRequiredBuffers -= numRequiredBuffers;` inside it, 
otherwise the  `numTotalRequiredBuffers` is leaked.


---


[GitHub] flink issue #6238: [FLINK-9636][network] fix inconsistency with failed buffe...

2018-07-02 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6238
  
Thanks for fixing this bug. :)
I think there is another potential bug in polling segments from queue 
during task canceling process, and I pointed out it in the above codes.


---


[GitHub] flink issue #5923: [FLINK-9253][network] make the maximum floating buffers c...

2018-04-27 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5923
  
Thanks for improving it, looks good from my point. :)


---


[GitHub] flink issue #5916: [hotfix][tests] remove redundant rebalance in SuccessAfte...

2018-04-26 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5916
  
LGTM :)


---


[GitHub] flink issue #5915: [FLINK-9243][tests] fix flaky SuccessAfterNetworkBuffersF...

2018-04-26 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5915
  
The points is already `rebalance()` in line 131, so it is no need to 
`rebalance()` again in line 137?


---


[GitHub] flink issue #5747: [FLINK-9057][network] fix an NPE when cleaning up before ...

2018-03-22 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5747
  
Thanks for fixing this problem. The `notifyReaderCreated` should be called 
after both views are created correctly, otherwise it will cause inconsistent.

LGTM 👍 


---


[GitHub] flink issue #5708: [FLINK-8984][network] Drop taskmanager.exactly-once.block...

2018-03-16 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5708
  
Thanks piotr, I agree with it. 


---


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-03-01 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r171575083
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -269,15 +269,21 @@
public static final ConfigOption NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
-   .withDescription("Number of network buffers to use for 
each outgoing/incoming channel (subpartition/input channel).");
+   .withDescription("Number of network buffers to use for 
each outgoing/incoming channel (subpartition/input channel)." +
+   "In credit-based flow control mode, this 
indicates how many credits are exclusive in each input channel. It should be" +
+   " configured at least 2 for good performance. 1 
buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
+   " for parallel serialization.");
 
/**
 * Number of extra network buffers to use for each outgoing/incoming 
gate (result partition/input gate).
 */
public static final ConfigOption 
NETWORK_EXTRA_BUFFERS_PER_GATE =

key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
-   .withDescription("Number of extra network buffers to 
use for each outgoing/incoming gate (result partition/input gate).");
+   .withDescription("Number of extra network buffers to 
use for each outgoing/incoming gate (result partition/input gate)." +
+   " In credit-based flow control mode, this 
indicates how many floating credits are shared among all the input channels." +
+   " The floating buffers are distributed based on 
backlog (real-time output buffers in the subpartition) feedback, and can" +
+   " help relieve back-pressure caused by 
unbalanced data distribution among the subpartitions.");
--- End diff --

Yeah, I already added it.


---


[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...

2018-02-27 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4529
  
Thanks for telling me the plan of it. I will do that if necessary. :)


---


[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...

2018-02-23 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4529
  
Hey @NicoK , would this PR be covered in FLINK-1.5? We experienced the 
netty direct memory out of memory sometimes in production cased by 
`extractFrame`, so we expect this improvement. :)


---


[GitHub] flink issue #5317: [FLINK-8458] Add the switch for keeping both the old mode...

2018-02-22 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5317
  
@pnowojski , thanks for your messages and I already updated the commit as 
you suggested.


---


[GitHub] flink issue #5558: [FLINK-8747][bugfix] The tag of waiting for floating buff...

2018-02-22 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5558
  
Thanks for your reviews! I already addressed above comments in a separate 
commit.
Regarding the tests I also think the same with you. :)


---


[GitHub] flink pull request #5558: [FLINK-8747][bugfix] The tag of waiting for floati...

2018-02-22 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5558#discussion_r170148337
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -396,32 +399,49 @@ public void 
testAvailableBuffersLessThanRequiredBuffers() throws Exception {
18, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
// Decrease the backlog
-   inputChannel.onSenderBacklog(15);
+   inputChannel.onSenderBacklog(13);
 
// Only the number of required buffers is changed by 
(backlog + numExclusiveBuffers)
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
assertEquals("There should be 15 buffers available in 
the channel",
15, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 17 buffers required in 
the channel",
-   17, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 15 buffers required in 
the channel",
+   15, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
-   // Recycle one exclusive buffer
-   exclusiveBuffer.recycleBuffer();
+   // Recycle one more floating buffer
+   floatingBufferQueue.poll().recycleBuffer();
 
-   // The exclusive buffer is returned to the channel 
directly
+   // Return the floating buffer to the buffer pool and 
the channel is not waiting for more floating buffers
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
+   assertEquals("There should be 15 buffers available in 
the channel",
+   15, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 15 buffers required in 
the channel",
+   15, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 1 buffers available in 
local pool",
+   1, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertFalse(inputChannel.isWaitingForFloatingBuffers());
+
+   // Increase the backlog again
+   inputChannel.onSenderBacklog(15);
+
+   // The floating buffer is requested from the buffer 
pool and the channel is registered as listener again.
+   verify(bufferPool, times(17)).requestBuffer();
+   verify(bufferPool, 
times(2)).addBufferListener(inputChannel);
assertEquals("There should be 16 buffers available in 
the channel",
16, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 17 buffers required in 
the channel",
17, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffers available in 
local pool",
+   assertEquals("There should be 0 buffer available in 
local pool",
--- End diff --

I also fixed other points with `buffers`.


---


[GitHub] flink pull request #5558: [FLINK-8747][bugfix] The tag of waiting for floati...

2018-02-22 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5558#discussion_r170148132
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -337,6 +337,11 @@ public int getSenderBacklog() {
return numRequiredBuffers - initialCredit;
}
 
+   @VisibleForTesting
+   public boolean isWaitingForFloatingBuffers() {
--- End diff --

yes


---


[GitHub] flink issue #5558: [FLINK-8747][bugfix] The tag of waiting for floating buff...

2018-02-22 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5558
  
@NicoK , as I described in the purpose of the change, there are two 
scenarios for this issue. And I only modified the previous 
`RemoteInputChannelTest#testAvailableBuffersLessThanRequiredBuffers` to cover 
one scenario. Do you think it is necessary to cover another scenario by 
recycling exclusive buffers to make available buffers not less than required 
buffers in a new unit test?


---


[GitHub] flink pull request #5558: [FLINK-8747][bugfix] The tag of waiting for floati...

2018-02-22 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5558

[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly

## What is the purpose of the change

*In credit-based flow control mode, when the number of available buffers is 
less than required buffers (backlog + initialCredit), the `RemoteInputChannel` 
will request floating buffers from `BufferProvider`. If not get enough 
available floating buffers, the `RemoteInputChannel` registers itself as 
listener in `BufferProvider` and updates the tag `isWaitingForFloatingBuffers` 
as true to avoid registration repeatedly.*

 *When a floating buffer is recycled to `BufferProvider`, it will notify 
the listener of available buffer. But the listener may not need floating 
buffers currently if the available buffers is not less than required buffers, 
then the floating buffers will be returned to BufferProvider directly. Most 
importantly, the tag `isWaitingForFloatingBuffers` should also be updated as 
false, otherwise the `RemoteInputChannel` will not request floating buffers any 
more after the available buffers less than required buffers.*

*There are two scenarios for causing the above issue:*

- The recycled exclusive buffers increase the total available buffers which 
is equal to or more than required buffers.
- The decreased sender's backlog resulting the available buffers equal to 
required buffers.

## Brief change log

  - *Updates the tag `isWaitingForFloatingBuffers` false on 
`notifyBufferAvailable` if current available buffers is not less than required 
buffers*

## Verifying this change

This change added tests and can be verified as follows:

  - *Modify 
`RemoteInputChannelTest#testAvailableBuffersLessThanRequiredBuffers to cover 
this case*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8747

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5558.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5558


commit 10ad6db14d072a5208c45c4be08624ebd7e8ea13
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-02-22T14:41:38Z

[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly




---


[GitHub] flink issue #5317: [FLINK-8458] Add the switch for keeping both the old mode...

2018-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5317
  
@NicoK , I found most of the codes in this PR are already merged into 
master by commit `0093bcbe771f296baf3857ef15fe7ec9b22bbc34` in your 
`FLINK-8425`. Maybe I only need to add the `config.md` in this PR.


---


[GitHub] flink issue #5317: [FLINK-8458] Add the switch for keeping both the old mode...

2018-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5317
  
Yes, I also found that and was rebasing on the latest codes. 
Because I am on the Spring Festival vacation, it can be ready later today 
or on Thursday.


---


[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-19 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , I have changed the `EXACTLY_ONCE_BLOCKING_DATA_ENABLED` as 
true and squashed the commits.


---


[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-13 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
Thanks for rebasing the conflicts.

Yes, the default value can be changed to true after the credit-based is 
totally merged. If need any changes on my side after all, pls let me know. :)


---


[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-12 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , I have submitted the updates for above comments.


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167582971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
--- End diff --

sure


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167582882
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
  */
-public class BarrierBufferTest {
-
-   private static final Random RND = new Random();
-
-   private static final int PAGE_SIZE = 512;
-
-   private static int sizeCounter = 0;
+public class BarrierBufferTest extends BarrierBufferTestBase {
--- End diff --

make sense


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167582763
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
+* @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+*/
+   BufferOrEventSequence rollOver(boolean newBuffer) throws IOException;
--- End diff --

sure


---


[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-09 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for suggestions and I totally agree with that. 
That abstraction indeed makes the code simple.  I will update the codes 
ASAP.


---


[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-07 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , I have submitted a separate commit to address above comments.


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166175837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

I think we can change the current `CheckpointBarrierHandler` interface into 
abstract class and then add a `createBarrierHanlder` method for extracting the 
common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we 
define a new class for the common method. I prefer the first way. 
What do you think?


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166174743
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

I think we can not directly mix all the blocked buffers for different 
checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` 
which indicates the blocked buffers for a specific checkpoint id, otherwise we 
can not know when the blocked buffers are exhausted after reset a specific 
checkpoint id. 

If we want to use only one `ArrayDeque` for blocking all buffers, we may 
need to insert extra hints of checkpoint id into this queue for helping when to 
stop reading blocked buffers from the queue.

For example:
channel1: [cp1,cp2,b1,cp3,b2,b3]
channel2: [cp2]

1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as 
separate sequence1.
2. When reading cp2 from channel2, the cp1 is released and begins to read 
sequence1.
3. When reading cp2 from seq1, the following buffers will be blocked in new 
seq2.
4. When reading cp3 from seq1

[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for reviews!

I understand your concerns and I should deduplicate some common utils in 
these tests. I will do that tomorrow together with other comments!


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165998584
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

yes, i will consider a proper way


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165997853
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

The current implementation keeps the same logic with `BarrierBuffer`. I am 
wondering whether it can make sense if only keeping one 
`ArrayDeque` for holding all blocking buffers for different 
checkpoint ids. Especially for the uncommon case mentioned on line 496 in 
`BarrierBuffer`. I will double check that logic and reply to you later.


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983714
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
--- End diff --

sure


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983496
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -184,6 +184,18 @@
key("taskmanager.network.detailed-metrics")
.defaultValue(false);
 
+   /**
+* Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
+* mode based on credit-based flow control.
+*
+* @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
+* credit-based flow control.
+*/
+   @Deprecated
+   public static final ConfigOption 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
+   key("taskmanager.exactly-once.blocking.data.enabled")
+   .defaultValue(false);
--- End diff --

yes, the default value should be true, but I think it should be changed 
after the `FLINK-7456` is merged to make the credit-based work.


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983607
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
--- End diff --

the checkstyle failures are fixed


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-01 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5400

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once

## What is the purpose of the change

*Currently in exactly-once mode, the BarrierBuffer would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the BarrierBuffer has to spill the data in disk files to recycle the buffers 
for blocked channels.*

*Based on credit-based flow control, every channel has exclusive buffers, 
so it is no need to spill data for avoiding deadlock. Then we implement a new 
CheckpointBarrierHandler for only buffering the data for blocked channels for 
better performance.*

*And this new CheckpointBarrierHandler can also be configured to use or not 
in order to rollback the original mode for unexpected risks.*

## Brief change log

  - *Implement the new `CreditBasedBarrierBuffer` and 
`CreditBasedBufferBlocker` for buffering data in blocked channels in 
exactly-once mode.*
  - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` 
for enabling the new handler or not.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests for the logic of `CreditBasedBarrierBuffer`*
  - *Added tests for the logic of `CreditBasedBufferBlocker`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8547

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5400


commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-02-02T07:45:49Z

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once




---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164993308
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputChannel inputChannel = 
indexToInputChannelMap.get(channelIndex);
+   if (inputChannel == null) {
+   throw new IllegalStateException("Could not find input 
channel from the channel index " + channelIndex);
--- End diff --

I referred to the other similar usage in `IllegalStateException`. If 
existing this condition that `channelIndex` is correct but the mapping 
construction is wrong.

I think `IllegalArgumentException` also makes sense. 


---


[GitHub] flink issue #5381: [FLINK-8523][network] Stop assigning floating buffers for...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
Thanks for reviews and suggestions! :)

I will add some unit tests first to verify the related logics. For itcase, 
I will consider the necessary and feasibility.


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164990652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -138,6 +148,24 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputGate inputGate = indexToInputGateMap.get(channelIndex);
+   if (inputGate == null) {
+   throw new IllegalStateException("Could not find input 
gate from the channel index " + channelIndex);
+   }
+
+   int indexOffset = inputGateToIndexOffsetMap.get(inputGate);
+   inputGate.blockInputChannel(channelIndex - indexOffset);
--- End diff --

alright


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164988630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -315,6 +322,7 @@ public void returnExclusiveSegments(List 
segments) throws IOExcep
public void setInputChannel(IntermediateResultPartitionID partitionId, 
InputChannel inputChannel) {
synchronized (requestLock) {
if (inputChannels.put(checkNotNull(partitionId), 
checkNotNull(inputChannel)) == null
+   && 
indexToInputChannelMap.put(inputChannel.getChannelIndex(), inputChannel) == null
--- End diff --

alright.


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164988413
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -133,8 +134,13 @@
 * Input channels. There is a one input channel for each consumed 
intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
+   @GuardedBy("requestLock")
private final Map<IntermediateResultPartitionID, InputChannel> 
inputChannels;
 
+   /** A mapping from internal channel index in this gate to input 
channel. */
+   @GuardedBy("requestLock")
+   private final Map<Integer, InputChannel> indexToInputChannelMap;
--- End diff --

ok


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164988217
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 
// Important: double check the isReleased state inside 
synchronized block, so there is no
// race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+   if (isReleased.get() || isBlocked() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
--- End diff --

That is the exact issue I pointed above. 

I think it will not cause credit leak currently.  But for further 
improvement, we can compare the current available credits with backlog after 
unblocking the channel. If not enough, we can trigger to request floating 
buffers at once. In current implementation, it has to wait next 
`onSenderBacklog` to trigger this process.


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164987333
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ---
@@ -165,6 +168,14 @@ protected void notifyChannelNonEmpty() {
 */
abstract void releaseAllResources() throws IOException;
 
+   protected boolean isBlocked() {
--- End diff --

yes, we can define the abstract method here and implement in specific 
`RemoteInputChannel` by synchronization.


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-31 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164986882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ---
@@ -66,6 +66,9 @@
/** The current backoff (in ms) */
private int currentBackoff;
 
+   /** Flag indicating whether this channel is currently blocked or not. */
+   private volatile boolean isBlocked = false;
--- End diff --

Yes, we have not improved the logic based on blocking state in 
`LocalInputChannel`. And it has the possibility to do that in future.


---


[GitHub] flink issue #5381: [FLINK-8523][network] Stop assigning floating buffers for...

2018-01-30 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
There is one issue I am thinking. 
After the input channel is unblocked, we can check whether this input 
channel needs request floating buffer from pool (if `availableNum < 
numRequiredBuffers` and `!isWaitingForFloatingBuffers`). Otherwise the request 
will be triggered by `onSenderBacklog` next time.
What do you think is better?


---


[GitHub] flink issue #5381: [FLINK-8523][network] Stop assigning floating buffers for...

2018-01-30 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
@pnowojski I have submitted the whole process in one commit.

After you verify the implementation is feasible, I will submit a separate 
commit for adding unit tests based on this process.


---


[GitHub] flink pull request #5381: [FLINK-8523][network] Stop assigning floating buff...

2018-01-29 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5381

[FLINK-8523][network] Stop assigning floating buffers for blocked input 
channels in exactly-once mode

## What is the purpose of the change

In exactly-once mode, the input channel is set blocked state when reading 
barrier from it. And the blocked state will be released after barrier alignment 
or cancelled.

In credit-based network flow control, we should avoid assigning floating 
buffers for blocked input channels because the buffers after barrier will not 
be processed by operator until alignment.

To do so, we can fully make use of floating buffers and speed up barrier 
alignment in some extent.

## Brief change log

  - *Add `blockInputChannel` and `releaseBlockedInputChannels` in 
`InputGate` interface*
  - *`UnionInputGate` constructs the mapping from channel index to 
`InputGate`*
  - *`SingleInputGate` constructs the mapping from channel index to 
`InputChannel`*
  - *`BarrierBuffer` determines the logic of blocking input channel or 
releasing it*
  - *Avoid assigning floating buffers for blocked input channels*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests that validates the floating buffers are not assigned 
for blocked input channels*
  - *Added unit test that validates the `BarrierBuffer` blocks or releases 
the `InputChannel` correctly*
  - *Added unit test that validates the `UnionInputGate` and 
`SingleInputGate` constructs the mapping correctly*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8523

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5381.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5381


commit 8c7ef5a22c348c3a6eedb04708b00093ae666e37
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-01-30T07:14:20Z

[FLINK-8523][network] Stop assigning floating buffers for blocked input 
channels in exactly-once mode




---


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-22 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r163155607
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
+- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra 
network buffers to use for each outgoing/incoming gate (result partition/input 
gate). In credit-based flow control mode, it indicates how many floating 
credits are shared for all the input channels. The floating buffers are 
distributed based on backlog (real-time output buffers in the subpartition) 
feedback. So the floating buffers can help relief back-pressure caused by 
imbalance data distribution among subpartitions.
+
--- End diff --

thanks for your polish, alpinegizmo.

I will apply the above fixes.


---


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-22 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r163155437
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
--- End diff --

It is also used in current old mode and it is no need to change the default 
value in most cases in the old mode.

Considering the new credit-based mode, if the value greater than 2, it can 
get benefits if the bottleneck is caused by slow downstream processing. The 
greater the value is set, the  lower probability of blocking the upstream and 
causing back-pressure. But we should also consider the total available buffer 
resources for setting this parameter.


---


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-19 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5317

[FLINK-8458] Add the switch for keeping both the old mode and the new 
credit-based mode

## What is the purpose of the change

*After the whole feature of credit-based flow control is done, we should 
add a config parameter to switch on/off the new credit-based mode. To do so, we 
can roll back to the old network mode for any expected risks.*

*The parameter is defined as 
taskmanager.network.credit-based-flow-control.enabled and the default value is 
true. This switch may be removed after next release.*

*This PR is based on #4552  whose commit is also included for passing 
travis.*

## Brief change log

  - *Abstract the `NetworkClientHandler` interface for different 
implementations in two modes*
  - *Abstract the `NetworkSequenceViewReader` interface for different 
implementations in two modes*
  - *Define the `taskmanager.network.credit-based-flow-control.enabled` in 
`TaskManagerOptions`*

## Verifying this change

This change is already covered by existing tests*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8458

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5317.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5317


commit a3b41f51173adaa382b42877142664d2b09101f7
Author: Zhijiang <wangzhijiang999@...>
Date:   2017-09-30T06:36:19Z

[FLINK-7456][network] Implement Netty sender incoming pipeline for 
credit-based

commit a8154989f9c93e71e3051d7184c0f02316f1a3c7
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-01-17T06:15:04Z

[FLINK-8458] Add the switch for keeping both the old mode and the new 
credit-based mode




---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162266243
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

Should I pick the FLINK-8225 in this PR?


---


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
1. Thanks for you FLINK08425.
2. I would have thought the tests for 
`ResultSubpartition#nextBufferIsEvent` which have already been covered before. 
The test for `BufferAndBacklog#nextBufferIsEvent()` is not included before, and 
thanks for providing the patch for it.
3. I will create a separate JIRA for the switch and also include the commit 
in this PR. And check the travis fail.
4. I will add the comment for the document you mentioned later.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ---
@@ -164,11 +165,13 @@ private boolean dispose() {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
-   PartitionRequestClientHandler 
requestHandler = channel.pipeline()
-   
.get(PartitionRequestClientHandler.class);
+   NetworkClientHandler clientHandler = 
channel.pipeline().get(PartitionRequestClientHandler.class);
+   if (clientHandler == null) {
+   clientHandler = 
channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
+   }
--- End diff --

good idea!


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259728
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, 
NettyMessage msg) throws
LOG.debug("Read channel on {}: {}.", 
ctx.channel().localAddress(), request);
 
try {
-   SequenceNumberingViewReader reader = 
new SequenceNumberingViewReader(
-   request.receiverId,
-   request.credit,
-   outboundQueue);
+   NetworkSequenceViewReader reader;
+   if (request.credit > 0) {
+   reader = new 
CreditBasedSequenceNumberingViewReader(
+   request.receiverId,
+   request.credit,
+   outboundQueue);
+   } else {
+   reader = new 
SequenceNumberingViewReader(
+   request.receiverId,
+   outboundQueue);
+   }
--- End diff --

Yes, I actually took a hacky way to realize it for easy. :)
I will consider whether it is feasible to get the config here. If so I will 
take the config way, otherwise I may add a comment clarifying this.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259766
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -125,11 +126,11 @@ private void enqueueAvailableReader(final 
SequenceNumberingViewReader reader) th
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
-   ArrayDeque getAvailableReaders() {
+   ArrayDeque getAvailableReaders() {
return availableReaders;
}
 
-   void notifyReaderCreated(final SequenceNumberingViewReader reader) {
+   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
--- End diff --

yes, for my careless


---


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-16 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted the switch for keeping the old mode and the new 
credit-based mode.


---


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , thanks for your reviews! 
I have submitted all the patches you provided offline to address above 
issues.

1. Remove `FLINK-8425` from this PR.
2. Do you think I should add more tests for `nextBufferIsEvent`? Because I 
already verified that in previous related tests
3. For adding the switch issue, I found some difficulties to leave messages 
for you offline. We can further confirm that.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667346
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

yes, the same reason as above


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667234
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

yes, i will set 9 for it.


---


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted all the modifications based on the patches you 
provided.
The tests for `nextBufferIsEvent` will be added in a new commit tomorrow.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161450876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
--- End diff --

agree with integration


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849525
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

yes, it should release all readers here.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

agree


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160371761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

Thanks for clarification. I misunderstood this tag from the comments, I 
will modify it after all the reviews together.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160333139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

The same with `triggerEnqueueAvailableReader`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160333068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

This method is not used only for testing. It may be called via `addCredit` 
method.


---


[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...

2018-01-08 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
Already close it


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2018-01-08 Thread zhijiangW
Github user zhijiangW closed the pull request at:

https://github.com/apache/flink/pull/4509


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2018-01-08 Thread zhijiangW
Github user zhijiangW closed the pull request at:

https://github.com/apache/flink/pull/4533


---


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2018-01-08 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
Sure, I close it.


---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK , I have submitted the `hotfix` commit to address above comments.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157709904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Yes, I totally agree with your point of current status of spillable/spilled 
subpartitions and subpartition views.

And I also think that the `PipelinedSubpartition` is the most important 
path and the `SpillableSubpartition` should not be very sensitive. I think we 
already reach a consensus for the way of `SpillableSubpartition` and I will do 
for that later. :)


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

Sorry my expression is not correct above. I mean we do not need 
`decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the 
`parent` as `SpillableSubpartition` in  `SpilledSubpartitionView`.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157694294
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

sure


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157693477
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

sure


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157691096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

The way of  `ArrayDeque#size()` for `getBuffersInBacklog()` may be not 
feasible because we do not know how many events in the `ArrayDeque` and they 
should not be considered as backlog length.

For the new API, we may need to modify the 
`ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping 
structure instead of `Buffer`, and do we also need to extend the 
`BufferAndAvailability` to add backlog in it?  By this way, it can get benefits 
for `PipelinedSubpartition` to reduce 'volatile`, but for 
`SpillableSubpartition`, the `volatile` may still be needed? Because the 
`getNextBuffer` and `decreaseBacklog` are in different parts for 
`SpillableSubpartitionView/SpilledSubpartitionView`.



---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157686388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` 
not `SpillableSubpartition`, after replacing the `ResultSubpartition` by 
`SpillableSubpartition`, we can make these methods package-private as you 
suggest. I will do that.


---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-14 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK, I have submitted two `[hotfix]` commits for the above issues.

One is for `Nullable` annotation and tests of backlog statistics. And the 
other is for updating backlog in thread-safe.  

For updating backlog, I think it should be done in `PipelinedSubpartition` 
and `SpillableSubpartition` separately in order to under synchronized region, 
although it seems somewhat redundant.  But I notice that the `isReleased()` 
method in `ResultSubpartition` is also implemented in this way.



---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-12 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK , thanks for suggestions.

I understand your point of wrapping the buffer and backlog together in a 
new structure returned by `getNextBuffer()` and it really makes sense for 
`PipelinedSubpartition`. But for `SpillableSubpartition`, when it begins to 
write the buffer to disk, we can not get the total backlog from that. We can 
only get the precise backlog by decreasing one for `getNextBuffer()` and 
increasing one for `add(Buffer)` .

So I think we can put the `decreaseStatistics` under the lock which can 
cover all the subpartitions.


---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
Or we make the backlog as `AtomicInteger` to keep the current process, 
otherwise we may need to call `decreaseStatistics` in different parts in the 
region of `synchronized(buffers)` . What do you think?


---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK @pnowojski , for backlog thread-safe issue, the current 
implementation is not thread-safe and we should restore my previous 
implementation that calls `decreaseStatistics` inside the method of 
`getNextBuffer` to make it inside the `synchronized` region. What do you think?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r155458048
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, 
InterruptedException {
listener.notifyBuffersAvailable(1);
}
 
+   if (current.isBuffer()) {
--- End diff --

I think the `decreaseStatistics` should be inside the 
`getNextBufferInternal`, otherwise the backlog value is not thread-safe. The 
previous implementation can make the 'decreaseStatistics` inside the 
synchronized part.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r155454935
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
--- End diff --

i think it makes sense.


---


  1   2   3   >