[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-29 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561200#comment-16561200
 ] 

zhijiang commented on FLINK-8523:
-

Hey [~pnowojski], [~NicoK]  Glad to see we come back to this issue again.

I think I understand your concerns completely, and actually there are two 
separate issues to be confirmed: 

1. Whether to spill intermediate buffers before barrier alignment?

If spilling the following buffers for blocked channel which already received 
barrier as before, we can free more floating buffer resources which may be used 
for other unblocked channels. From this point, it seems get benefit for barrier 
alignment. But the only concern is that it brings additional IO cost during 
spilling/replaying intermediate buffers. If the alignment is very fast which 
means only few intermediate buffers need to be spilled, and they may still 
exist in OS cache, so the cost can be ignored. But if the spilled data is very 
huge in IO sensitive environment, it will greatly hurt the performance in TPS.

If not spilling as current codes, the only concern is that we can not make 
fully use of floating buffers before alignment, and it may delay the barrier 
alignment in some scenarios. 

So based on above analysis, no matter which way we take, it both has good 
points and bad points, and the behaviors may be different in various scenarios. 
In non-credit-based mode, we have to spill the data to avoid the deadlock, but 
now we have the chance to avoid the spill to try to make it better. And it 
seems better to not involve in any disk IO operation for stream job in runtime 
stack. From this point, I prefer to the way of not spilling. Maybe we need more 
tests, feedback or thinking for the final decision.

2. Avoid requesting floating buffers for blocked channels

I think we can reach an agreement in this issue. No matter what is the 
conclusion of first issue. it is reasonable and can get determined benefit for 
doing this. And this JIRA is focusing on this issue.

 

BTW, we ever made another improvement for speeding barrier alignment, that is 
reading unblocked channels in first priority instead of current random 
mode(FIFO based on network receiving). And it indeeds improve a log in barrier 
alignment aspect, because the task will not select unused intermediate buffers 
any more before alignment. But this selection may also change the original back 
pressure behavior and effect the performance in some scenarios. So it may be 
also a trade off.

> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-29 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561199#comment-16561199
 ] 

zhijiang commented on FLINK-8523:
-

Hey [~pnowojski], [~NicoK]  Glad to see we come back to this issue again.

I think I understand your concerns completely, and actually there are two 
separate issues to be confirmed: 

1. Whether to spill intermediate buffers before barrier alignment?

If spilling the following buffers for blocked channel which already received 
barrier as before, we can free more floating buffer resources which may be used 
for other unblocked channels. From this point, it seems get benefit for barrier 
alignment. But the only concern is that it brings additional IO cost during 
spilling/replaying intermediate buffers. If the alignment is very fast which 
means only few intermediate buffers need to be spilled, and they may still 
exist in OS cache, so the cost can be ignored. But if the spilled data is very 
huge in IO sensitive environment, it will greatly hurt the performance in TPS.

If not spilling as current codes, the only concern is that we can not make 
fully use of floating buffers before alignment, and it may delay the barrier 
alignment in some scenarios. 

So based on above analysis, no matter which way we take, it both has good 
points and bad points, and the behaviors may be different in various scenarios. 
In non-credit-based mode, we have to spill the data to avoid the deadlock, but 
now we have the chance to avoid the spill to try to make it better. And it 
seems better to not involve in any disk IO operation for stream job in runtime 
stack. From this point, I prefer to the way of not spilling. Maybe we need more 
tests, feedback or thinking for the final decision.

2. Avoid requesting floating buffers for blocked channels

I think we can reach an agreement in this issue. No matter what is the 
conclusion of first issue. it is reasonable and can get determined benefit for 
doing this. And this JIRA is focusing on this issue.

 

BTW, we ever made another improvement for speeding barrier alignment, that is 
reading unblocked channels in first priority instead of current random 
mode(FIFO based on network receiving). And it indeeds improve a log in barrier 
alignment aspect, because the task will not select unused intermediate buffers 
any more before alignment. But this selection may also change the original back 
pressure behavior and effect the performance in some scenarios. So it may be 
also a trade off.

> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-27 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559809#comment-16559809
 ] 

Piotr Nowojski commented on FLINK-8523:
---

I think we shouldn't spill at all. Whenever sender serializes a checkpoint 
barrier, it could either stop requesting for floating buffers, or return them 
back to the receiver. Receiver could on the other hand not assign floating 
buffers for channels blocked on alignment. Data that follow after checkpoint 
barrier on the sender, would be just sitting in sender buffers waiting for more 
credits.

I know that this seems like natural expansion towards "as it was before", but 
spilling to disk in a network stack seems like a something very bad to me and I 
haven't seen/heard about similar solution somewhere else.

Besides being wrong conceptually to me, it would/could seriously affect 
performance, especially if IO is already saturated by RocksDB or checkpointing. 
In such situations even writing/reading a single byte can block system for very 
long times. Reading bytes from disk is just slower and doesn't scale compared 
to sending them over the network. To make matters worse, those spilled buffers 
would be tiny. Combined with multiple channels and multiple tasks we would be 
asking for quite a lot of small random reads/writes. Thus compared to old 
solution where we were spilling tons of data from a handful of streams, new one 
that spills less data could happen to not be significantly faster.

> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558825#comment-16558825
 ] 

Nico Kruber commented on FLINK-8523:


Sorry for coming back so late, but we recently discovered that it is actually 
very important not to hold on to the received (floating and exclusive) buffers 
during alignment, too, because then you would not use all available resources 
to do the alignment for the remaining channels. This may make alignments 
possibly worse than without flow control.

This ticket only prevents assigning new floating buffers to blocked channels 
but the effect is somewhat limited. What we would really need is to also free 
any floating buffers that we already have. Since we also have exclusive buffers 
in the {{BarrierBuffer}} and need to keep the order, [~StephanEwen] and me were 
thinking about 1) blocking the channel to not advertise more credit, 2) 
spilling all of the already received buffers to disk (as before) and therefore 
freeing all used floating buffers.

This ticket can be seen as a step towards that goal but I'd really like to see 
the whole thing alltogether (possible follow-up tasks or change the ticket).

> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558810#comment-16558810
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205575969
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -133,8 +134,13 @@
 * Input channels. There is a one input channel for each consumed 
intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
+   @GuardedBy("requestLock")
private final Map 
inputChannels;
 
+   /** A mapping from internal channel index in this gate to input 
channel. */
+   @GuardedBy("requestLock")
+   private final Map indexToInputChannelMap;
 
 Review comment:
   and maybe call it `inputChannelsByChannelIndex` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558811#comment-16558811
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205577375
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputChannel inputChannel = 
indexToInputChannelMap.get(channelIndex);
+   if (inputChannel == null) {
+   throw new IllegalStateException("Could not find input 
channel from the channel index " + channelIndex);
+   }
+
+   inputChannel.setBlocked(true);
+   }
+
+   @Override
+   public void releaseBlockedInputChannels() {
+   for (InputChannel inputChannel : inputChannels.values()) {
+   inputChannel.setBlocked(false);
+   }
 
 Review comment:
   ~Do we need to make sure that there's no concurrent `blockInputChannel` call 
trying to block a channel for an alignment for a later checkpoint or can we 
assume here that we are still processing one alignment (doing the release) and 
therefore cannot concurrently block?~
   I guess we are safe here - `BarrierBuffer` actually does it the same way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558813#comment-16558813
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205576747
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputChannel inputChannel = 
indexToInputChannelMap.get(channelIndex);
+   if (inputChannel == null) {
+   throw new IllegalStateException("Could not find input 
channel from the channel index " + channelIndex);
 
 Review comment:
   both argumentations sound right:
   how about `checkArgument(0 <= channelIndex && channelIndex < 
numberOfInputChannels)` and `checkState(inputChannel != null)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558814#comment-16558814
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205575797
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -133,8 +134,13 @@
 * Input channels. There is a one input channel for each consumed 
intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
+   @GuardedBy("requestLock")
private final Map 
inputChannels;
 
+   /** A mapping from internal channel index in this gate to input 
channel. */
+   @GuardedBy("requestLock")
+   private final Map indexToInputChannelMap;
 
 Review comment:
   Actually, this could be a simple array, couldn't it? If you look at 
`org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#create` 
you'll see that `0 <= channelIndex < inputChannels.length`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558812#comment-16558812
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205579424
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ##
 @@ -82,30 +82,40 @@
 */
private final Map inputGateToIndexOffsetMap;
 
+   /** A mapping from logical channel index (internal channel index in 
input gate plus gate's offset) to input gate. */
+   private final Map indexToInputGateMap;
 
 Review comment:
   Similar to the `SingleInputGate`: use an array, rename member


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558793#comment-16558793
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205572919
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ##
 @@ -66,6 +66,9 @@
/** The current backoff (in ms) */
private int currentBackoff;
 
+   /** Flag indicating whether this channel is currently blocked or not. */
+   private volatile boolean isBlocked = false;
 
 Review comment:
   Does it make sense to have it in the `LocalInputChannel` then? I guess, we 
should not add dead code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558791#comment-16558791
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop 
assigning floating buffers for blocked input channels in exactly-once mode
URL: https://github.com/apache/flink/pull/5381#discussion_r205572300
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 
// Important: double check the isReleased state inside 
synchronized block, so there is no
// race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+   if (isReleased.get() || isBlocked() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
 
 Review comment:
   Indeed, the "blocked" credits should be in `numRequiredBuffers` which was 
updated with the last buffer that we received. Any update on that will come 
with the next buffer. We just need to make sure that we acknowledge the newly 
freed credit to the sender (the backlog may be 0).
   
   The request for floating buffers should remain fair among all channels 
though and therefore we cannot request all floating buffers to satisfy that 
need at once!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539793#comment-16539793
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
Yes, I think it can make better use of floating buffers.

As last confirmation with Piotr, this PR will not be merged into 
release-1.5, because there were still some issues not confirmed before. So I 
pended this PR and did not update it temporarily.

What do you think of this PR issue? I can continue on it if necessary.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539691#comment-16539691
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5381
  
@zhijiangW is this PR still useful? if so, was there any progress?


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346492#comment-16346492
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164993308
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputChannel inputChannel = 
indexToInputChannelMap.get(channelIndex);
+   if (inputChannel == null) {
+   throw new IllegalStateException("Could not find input 
channel from the channel index " + channelIndex);
--- End diff --

I referred to the other similar usage in `IllegalStateException`. If 
existing this condition that `channelIndex` is correct but the mapping 
construction is wrong.

I think `IllegalArgumentException` also makes sense. 


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346485#comment-16346485
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
Thanks for reviews and suggestions! :)

I will add some unit tests first to verify the related logics. For itcase, 
I will consider the necessary and feasibility.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346480#comment-16346480
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164990652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -138,6 +148,24 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputGate inputGate = indexToInputGateMap.get(channelIndex);
+   if (inputGate == null) {
+   throw new IllegalStateException("Could not find input 
gate from the channel index " + channelIndex);
+   }
+
+   int indexOffset = inputGateToIndexOffsetMap.get(inputGate);
+   inputGate.blockInputChannel(channelIndex - indexOffset);
--- End diff --

alright


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346467#comment-16346467
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164988217
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 
// Important: double check the isReleased state inside 
synchronized block, so there is no
// race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+   if (isReleased.get() || isBlocked() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
--- End diff --

That is the exact issue I pointed above. 

I think it will not cause credit leak currently.  But for further 
improvement, we can compare the current available credits with backlog after 
unblocking the channel. If not enough, we can trigger to request floating 
buffers at once. In current implementation, it has to wait next 
`onSenderBacklog` to trigger this process.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346469#comment-16346469
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164988630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -315,6 +322,7 @@ public void returnExclusiveSegments(List 
segments) throws IOExcep
public void setInputChannel(IntermediateResultPartitionID partitionId, 
InputChannel inputChannel) {
synchronized (requestLock) {
if (inputChannels.put(checkNotNull(partitionId), 
checkNotNull(inputChannel)) == null
+   && 
indexToInputChannelMap.put(inputChannel.getChannelIndex(), inputChannel) == null
--- End diff --

alright.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346468#comment-16346468
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164988413
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -133,8 +134,13 @@
 * Input channels. There is a one input channel for each consumed 
intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
+   @GuardedBy("requestLock")
private final Map 
inputChannels;
 
+   /** A mapping from internal channel index in this gate to input 
channel. */
+   @GuardedBy("requestLock")
+   private final Map indexToInputChannelMap;
--- End diff --

ok


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346463#comment-16346463
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164987333
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ---
@@ -165,6 +168,14 @@ protected void notifyChannelNonEmpty() {
 */
abstract void releaseAllResources() throws IOException;
 
+   protected boolean isBlocked() {
--- End diff --

yes, we can define the abstract method here and implement in specific 
`RemoteInputChannel` by synchronization.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346461#comment-16346461
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164986882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ---
@@ -66,6 +66,9 @@
/** The current backoff (in ms) */
private int currentBackoff;
 
+   /** Flag indicating whether this channel is currently blocked or not. */
+   private volatile boolean isBlocked = false;
--- End diff --

Yes, we have not improved the logic based on blocking state in 
`LocalInputChannel`. And it has the possibility to do that in future.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346432#comment-16346432
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164979541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -138,6 +148,24 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputGate inputGate = indexToInputGateMap.get(channelIndex);
+   if (inputGate == null) {
+   throw new IllegalStateException("Could not find input 
gate from the channel index " + channelIndex);
+   }
+
+   int indexOffset = inputGateToIndexOffsetMap.get(inputGate);
+   inputGate.blockInputChannel(channelIndex - indexOffset);
--- End diff --

wrap this offset handling in a private method:
```
inputGate.blockInputChannel(externalToInternalChannelIndex(channelIndex, 
inputGate));
```
and similar call in `getNextBufferOrEvent()` method to:
```

bufferOrEvent.setChannelIndex(internalToExternalChannelIndex(bufferOrEvent.getChannelIndex(),
 inputGate));
```


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346428#comment-16346428
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164974663
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ---
@@ -66,6 +66,9 @@
/** The current backoff (in ms) */
private int currentBackoff;
 
+   /** Flag indicating whether this channel is currently blocked or not. */
+   private volatile boolean isBlocked = false;
--- End diff --

As for now, blocking is not yet implemented/handled for 
`LocalInputChannel`s? 


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346429#comment-16346429
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164974094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 
// Important: double check the isReleased state inside 
synchronized block, so there is no
// race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+   if (isReleased.get() || isBlocked() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
--- End diff --

shouldn't we accumulate/remember the count of "blocked" credits and 
restore/reassign them upon unblocking the channel? Doesn't the current code 
"leak" the credit? Or is there some other code, that will restore the credits 
balance (both exclusive/floating point buffers).


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346431#comment-16346431
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164975471
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 ---
@@ -165,6 +168,14 @@ protected void notifyChannelNonEmpty() {
 */
abstract void releaseAllResources() throws IOException;
 
+   protected boolean isBlocked() {
--- End diff --

If you move implementation of `isBlocked`/`setBlocked` methods to the 
`RemoteInputChannel` synchronisation can be optimised - instead of using 
another synchronisation method in form of `volatile`, `RemoteInputChannel` 
could use already existing `synchronized (bufferQueue)` for this purpose.

As it is now, this `volatile` `isBlocked` adds an additional overhead which 
could be avoided.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346433#comment-16346433
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164977052
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   @Override
+   public void blockInputChannel(int channelIndex) {
+   InputChannel inputChannel = 
indexToInputChannelMap.get(channelIndex);
+   if (inputChannel == null) {
+   throw new IllegalStateException("Could not find input 
channel from the channel index " + channelIndex);
--- End diff --

`IllegalArgumentException`?


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346434#comment-16346434
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164972989
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -133,8 +134,13 @@
 * Input channels. There is a one input channel for each consumed 
intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
+   @GuardedBy("requestLock")
private final Map 
inputChannels;
 
+   /** A mapping from internal channel index in this gate to input 
channel. */
+   @GuardedBy("requestLock")
+   private final Map indexToInputChannelMap;
--- End diff --

nit: maybe rename to `channelIndexToInputChannel` and drop the comment and 
maybe drop the comment? It doesn't add much value.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346430#comment-16346430
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5381#discussion_r164975820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -315,6 +322,7 @@ public void returnExclusiveSegments(List 
segments) throws IOExcep
public void setInputChannel(IntermediateResultPartitionID partitionId, 
InputChannel inputChannel) {
synchronized (requestLock) {
if (inputChannels.put(checkNotNull(partitionId), 
checkNotNull(inputChannel)) == null
+   && 
indexToInputChannelMap.put(inputChannel.getChannelIndex(), inputChannel) == null
--- End diff --

Please extract `inputChannels.put(...)` and 
`indexToInputChannelMap.put(...)` to separate method wrapping those calls - 
this logic is duplicated with `updateInputChannel(...)`.

Also change it to something like this:

```
private boolean addInputChannel(inputChannel, partitionId) {
  boolean alreadyInInputChannels = inputChannels.put(...) != null;
  boolean alreadyInIndexesMapping = indexToInputChannelMap.put(...) != null;
  checkState(alreadyInInputChannels ^ alreadyInIndexesMapping, 
"Inconsistent internal state (...)");
  return alreadyInInputChannels;
}
```


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344746#comment-16344746
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
There is one issue I am thinking. 
After the input channel is unblocked, we can check whether this input 
channel needs request floating buffer from pool (if `availableNum < 
numRequiredBuffers` and `!isWaitingForFloatingBuffers`). Otherwise the request 
will be triggered by `onSenderBacklog` next time.
What do you think is better?


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344670#comment-16344670
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5381
  
@pnowojski I have submitted the whole process in one commit.

After you verify the implementation is feasible, I will submit a separate 
commit for adding unit tests based on this process.


> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344611#comment-16344611
 ] 

ASF GitHub Bot commented on FLINK-8523:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5381

[FLINK-8523][network] Stop assigning floating buffers for blocked input 
channels in exactly-once mode

## What is the purpose of the change

In exactly-once mode, the input channel is set blocked state when reading 
barrier from it. And the blocked state will be released after barrier alignment 
or cancelled.

In credit-based network flow control, we should avoid assigning floating 
buffers for blocked input channels because the buffers after barrier will not 
be processed by operator until alignment.

To do so, we can fully make use of floating buffers and speed up barrier 
alignment in some extent.

## Brief change log

  - *Add `blockInputChannel` and `releaseBlockedInputChannels` in 
`InputGate` interface*
  - *`UnionInputGate` constructs the mapping from channel index to 
`InputGate`*
  - *`SingleInputGate` constructs the mapping from channel index to 
`InputChannel`*
  - *`BarrierBuffer` determines the logic of blocking input channel or 
releasing it*
  - *Avoid assigning floating buffers for blocked input channels*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests that validates the floating buffers are not assigned 
for blocked input channels*
  - *Added unit test that validates the `BarrierBuffer` blocks or releases 
the `InputChannel` correctly*
  - *Added unit test that validates the `UnionInputGate` and 
`SingleInputGate` constructs the mapping correctly*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8523

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5381.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5381


commit 8c7ef5a22c348c3a6eedb04708b00093ae666e37
Author: Zhijiang 
Date:   2018-01-30T07:14:20Z

[FLINK-8523][network] Stop assigning floating buffers for blocked input 
channels in exactly-once mode




> Stop assigning floating buffers for blocked input channels in exactly-once 
> mode
> ---
>
> Key: FLINK-8523
> URL: https://issues.apache.org/jira/browse/FLINK-8523
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> In exactly-once mode, the input channel is set blocked state when reading 
> barrier from it. And the blocked state will be released after barrier 
> alignment or cancelled.
>  
> In credit-based network flow control, we should avoid assigning floating 
> buffers for blocked input channels because the buffers after barrier will not 
> be processed by operator until alignment.
> To do so, we can fully make use of floating buffers and speed up barrier 
> alignment in some extent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)