[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561200#comment-16561200 ] zhijiang commented on FLINK-8523: - Hey [~pnowojski], [~NicoK] Glad to see we come back to this issue again. I think I understand your concerns completely, and actually there are two separate issues to be confirmed: 1. Whether to spill intermediate buffers before barrier alignment? If spilling the following buffers for blocked channel which already received barrier as before, we can free more floating buffer resources which may be used for other unblocked channels. From this point, it seems get benefit for barrier alignment. But the only concern is that it brings additional IO cost during spilling/replaying intermediate buffers. If the alignment is very fast which means only few intermediate buffers need to be spilled, and they may still exist in OS cache, so the cost can be ignored. But if the spilled data is very huge in IO sensitive environment, it will greatly hurt the performance in TPS. If not spilling as current codes, the only concern is that we can not make fully use of floating buffers before alignment, and it may delay the barrier alignment in some scenarios. So based on above analysis, no matter which way we take, it both has good points and bad points, and the behaviors may be different in various scenarios. In non-credit-based mode, we have to spill the data to avoid the deadlock, but now we have the chance to avoid the spill to try to make it better. And it seems better to not involve in any disk IO operation for stream job in runtime stack. From this point, I prefer to the way of not spilling. Maybe we need more tests, feedback or thinking for the final decision. 2. Avoid requesting floating buffers for blocked channels I think we can reach an agreement in this issue. No matter what is the conclusion of first issue. it is reasonable and can get determined benefit for doing this. And this JIRA is focusing on this issue. BTW, we ever made another improvement for speeding barrier alignment, that is reading unblocked channels in first priority instead of current random mode(FIFO based on network receiving). And it indeeds improve a log in barrier alignment aspect, because the task will not select unused intermediate buffers any more before alignment. But this selection may also change the original back pressure behavior and effect the performance in some scenarios. So it may be also a trade off. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561199#comment-16561199 ] zhijiang commented on FLINK-8523: - Hey [~pnowojski], [~NicoK] Glad to see we come back to this issue again. I think I understand your concerns completely, and actually there are two separate issues to be confirmed: 1. Whether to spill intermediate buffers before barrier alignment? If spilling the following buffers for blocked channel which already received barrier as before, we can free more floating buffer resources which may be used for other unblocked channels. From this point, it seems get benefit for barrier alignment. But the only concern is that it brings additional IO cost during spilling/replaying intermediate buffers. If the alignment is very fast which means only few intermediate buffers need to be spilled, and they may still exist in OS cache, so the cost can be ignored. But if the spilled data is very huge in IO sensitive environment, it will greatly hurt the performance in TPS. If not spilling as current codes, the only concern is that we can not make fully use of floating buffers before alignment, and it may delay the barrier alignment in some scenarios. So based on above analysis, no matter which way we take, it both has good points and bad points, and the behaviors may be different in various scenarios. In non-credit-based mode, we have to spill the data to avoid the deadlock, but now we have the chance to avoid the spill to try to make it better. And it seems better to not involve in any disk IO operation for stream job in runtime stack. From this point, I prefer to the way of not spilling. Maybe we need more tests, feedback or thinking for the final decision. 2. Avoid requesting floating buffers for blocked channels I think we can reach an agreement in this issue. No matter what is the conclusion of first issue. it is reasonable and can get determined benefit for doing this. And this JIRA is focusing on this issue. BTW, we ever made another improvement for speeding barrier alignment, that is reading unblocked channels in first priority instead of current random mode(FIFO based on network receiving). And it indeeds improve a log in barrier alignment aspect, because the task will not select unused intermediate buffers any more before alignment. But this selection may also change the original back pressure behavior and effect the performance in some scenarios. So it may be also a trade off. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559809#comment-16559809 ] Piotr Nowojski commented on FLINK-8523: --- I think we shouldn't spill at all. Whenever sender serializes a checkpoint barrier, it could either stop requesting for floating buffers, or return them back to the receiver. Receiver could on the other hand not assign floating buffers for channels blocked on alignment. Data that follow after checkpoint barrier on the sender, would be just sitting in sender buffers waiting for more credits. I know that this seems like natural expansion towards "as it was before", but spilling to disk in a network stack seems like a something very bad to me and I haven't seen/heard about similar solution somewhere else. Besides being wrong conceptually to me, it would/could seriously affect performance, especially if IO is already saturated by RocksDB or checkpointing. In such situations even writing/reading a single byte can block system for very long times. Reading bytes from disk is just slower and doesn't scale compared to sending them over the network. To make matters worse, those spilled buffers would be tiny. Combined with multiple channels and multiple tasks we would be asking for quite a lot of small random reads/writes. Thus compared to old solution where we were spilling tons of data from a handful of streams, new one that spills less data could happen to not be significantly faster. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558825#comment-16558825 ] Nico Kruber commented on FLINK-8523: Sorry for coming back so late, but we recently discovered that it is actually very important not to hold on to the received (floating and exclusive) buffers during alignment, too, because then you would not use all available resources to do the alignment for the remaining channels. This may make alignments possibly worse than without flow control. This ticket only prevents assigning new floating buffers to blocked channels but the effect is somewhat limited. What we would really need is to also free any floating buffers that we already have. Since we also have exclusive buffers in the {{BarrierBuffer}} and need to keep the order, [~StephanEwen] and me were thinking about 1) blocking the channel to not advertise more credit, 2) spilling all of the already received buffers to disk (as before) and therefore freeing all used floating buffers. This ticket can be seen as a step towards that goal but I'd really like to see the whole thing alltogether (possible follow-up tasks or change the ticket). > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558810#comment-16558810 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205575969 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -133,8 +134,13 @@ * Input channels. There is a one input channel for each consumed intermediate result partition. * We store this in a map for runtime updates of single channels. */ + @GuardedBy("requestLock") private final Map inputChannels; + /** A mapping from internal channel index in this gate to input channel. */ + @GuardedBy("requestLock") + private final Map indexToInputChannelMap; Review comment: and maybe call it `inputChannelsByChannelIndex` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558811#comment-16558811 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205577375 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, InterruptedException { } } + @Override + public void blockInputChannel(int channelIndex) { + InputChannel inputChannel = indexToInputChannelMap.get(channelIndex); + if (inputChannel == null) { + throw new IllegalStateException("Could not find input channel from the channel index " + channelIndex); + } + + inputChannel.setBlocked(true); + } + + @Override + public void releaseBlockedInputChannels() { + for (InputChannel inputChannel : inputChannels.values()) { + inputChannel.setBlocked(false); + } Review comment: ~Do we need to make sure that there's no concurrent `blockInputChannel` call trying to block a channel for an alignment for a later checkpoint or can we assume here that we are still processing one alignment (doing the release) and therefore cannot concurrently block?~ I guess we are safe here - `BarrierBuffer` actually does it the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558813#comment-16558813 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205576747 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, InterruptedException { } } + @Override + public void blockInputChannel(int channelIndex) { + InputChannel inputChannel = indexToInputChannelMap.get(channelIndex); + if (inputChannel == null) { + throw new IllegalStateException("Could not find input channel from the channel index " + channelIndex); Review comment: both argumentations sound right: how about `checkArgument(0 <= channelIndex && channelIndex < numberOfInputChannels)` and `checkState(inputChannel != null)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558814#comment-16558814 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205575797 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -133,8 +134,13 @@ * Input channels. There is a one input channel for each consumed intermediate result partition. * We store this in a map for runtime updates of single channels. */ + @GuardedBy("requestLock") private final Map inputChannels; + /** A mapping from internal channel index in this gate to input channel. */ + @GuardedBy("requestLock") + private final Map indexToInputChannelMap; Review comment: Actually, this could be a simple array, couldn't it? If you look at `org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#create` you'll see that `0 <= channelIndex < inputChannels.length` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558812#comment-16558812 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205579424 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ## @@ -82,30 +82,40 @@ */ private final Map inputGateToIndexOffsetMap; + /** A mapping from logical channel index (internal channel index in input gate plus gate's offset) to input gate. */ + private final Map indexToInputGateMap; Review comment: Similar to the `SingleInputGate`: use an array, rename member This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558793#comment-16558793 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205572919 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ## @@ -66,6 +66,9 @@ /** The current backoff (in ms) */ private int currentBackoff; + /** Flag indicating whether this channel is currently blocked or not. */ + private volatile boolean isBlocked = false; Review comment: Does it make sense to have it in the `LocalInputChannel` then? I guess, we should not add dead code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558791#comment-16558791 ] ASF GitHub Bot commented on FLINK-8523: --- NicoK commented on a change in pull request #5381: [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode URL: https://github.com/apache/flink/pull/5381#discussion_r205572300 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) { // Important: double check the isReleased state inside synchronized block, so there is no // race condition when notifyBufferAvailable and releaseAllResources running in parallel. - if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + if (isReleased.get() || isBlocked() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { Review comment: Indeed, the "blocked" credits should be in `numRequiredBuffers` which was updated with the last buffer that we received. Any update on that will come with the next buffer. We just need to make sure that we acknowledge the newly freed credit to the sender (the backlog may be 0). The request for floating buffers should remain fair among all channels though and therefore we cannot request all floating buffers to satisfy that need at once! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539793#comment-16539793 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5381 Yes, I think it can make better use of floating buffers. As last confirmation with Piotr, this PR will not be merged into release-1.5, because there were still some issues not confirmed before. So I pended this PR and did not update it temporarily. What do you think of this PR issue? I can continue on it if necessary. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539691#comment-16539691 ] ASF GitHub Bot commented on FLINK-8523: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5381 @zhijiangW is this PR still useful? if so, was there any progress? > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346492#comment-16346492 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164993308 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, InterruptedException { } } + @Override + public void blockInputChannel(int channelIndex) { + InputChannel inputChannel = indexToInputChannelMap.get(channelIndex); + if (inputChannel == null) { + throw new IllegalStateException("Could not find input channel from the channel index " + channelIndex); --- End diff -- I referred to the other similar usage in `IllegalStateException`. If existing this condition that `channelIndex` is correct but the mapping construction is wrong. I think `IllegalArgumentException` also makes sense. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346485#comment-16346485 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5381 Thanks for reviews and suggestions! :) I will add some unit tests first to verify the related logics. For itcase, I will consider the necessary and feasibility. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346480#comment-16346480 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164990652 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -138,6 +148,24 @@ public void requestPartitions() throws IOException, InterruptedException { } } + @Override + public void blockInputChannel(int channelIndex) { + InputGate inputGate = indexToInputGateMap.get(channelIndex); + if (inputGate == null) { + throw new IllegalStateException("Could not find input gate from the channel index " + channelIndex); + } + + int indexOffset = inputGateToIndexOffsetMap.get(inputGate); + inputGate.blockInputChannel(channelIndex - indexOffset); --- End diff -- alright > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346467#comment-16346467 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164988217 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) { // Important: double check the isReleased state inside synchronized block, so there is no // race condition when notifyBufferAvailable and releaseAllResources running in parallel. - if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + if (isReleased.get() || isBlocked() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { --- End diff -- That is the exact issue I pointed above. I think it will not cause credit leak currently. But for further improvement, we can compare the current available credits with backlog after unblocking the channel. If not enough, we can trigger to request floating buffers at once. In current implementation, it has to wait next `onSenderBacklog` to trigger this process. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346469#comment-16346469 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164988630 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -315,6 +322,7 @@ public void returnExclusiveSegments(List segments) throws IOExcep public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) { synchronized (requestLock) { if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null + && indexToInputChannelMap.put(inputChannel.getChannelIndex(), inputChannel) == null --- End diff -- alright. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346468#comment-16346468 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164988413 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -133,8 +134,13 @@ * Input channels. There is a one input channel for each consumed intermediate result partition. * We store this in a map for runtime updates of single channels. */ + @GuardedBy("requestLock") private final MapinputChannels; + /** A mapping from internal channel index in this gate to input channel. */ + @GuardedBy("requestLock") + private final Map indexToInputChannelMap; --- End diff -- ok > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346463#comment-16346463 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164987333 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java --- @@ -165,6 +168,14 @@ protected void notifyChannelNonEmpty() { */ abstract void releaseAllResources() throws IOException; + protected boolean isBlocked() { --- End diff -- yes, we can define the abstract method here and implement in specific `RemoteInputChannel` by synchronization. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346461#comment-16346461 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164986882 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java --- @@ -66,6 +66,9 @@ /** The current backoff (in ms) */ private int currentBackoff; + /** Flag indicating whether this channel is currently blocked or not. */ + private volatile boolean isBlocked = false; --- End diff -- Yes, we have not improved the logic based on blocking state in `LocalInputChannel`. And it has the possibility to do that in future. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346432#comment-16346432 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164979541 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -138,6 +148,24 @@ public void requestPartitions() throws IOException, InterruptedException { } } + @Override + public void blockInputChannel(int channelIndex) { + InputGate inputGate = indexToInputGateMap.get(channelIndex); + if (inputGate == null) { + throw new IllegalStateException("Could not find input gate from the channel index " + channelIndex); + } + + int indexOffset = inputGateToIndexOffsetMap.get(inputGate); + inputGate.blockInputChannel(channelIndex - indexOffset); --- End diff -- wrap this offset handling in a private method: ``` inputGate.blockInputChannel(externalToInternalChannelIndex(channelIndex, inputGate)); ``` and similar call in `getNextBufferOrEvent()` method to: ``` bufferOrEvent.setChannelIndex(internalToExternalChannelIndex(bufferOrEvent.getChannelIndex(), inputGate)); ``` > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346428#comment-16346428 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164974663 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java --- @@ -66,6 +66,9 @@ /** The current backoff (in ms) */ private int currentBackoff; + /** Flag indicating whether this channel is currently blocked or not. */ + private volatile boolean isBlocked = false; --- End diff -- As for now, blocking is not yet implemented/handled for `LocalInputChannel`s? > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346429#comment-16346429 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164974094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -360,8 +360,9 @@ public boolean notifyBufferAvailable(Buffer buffer) { // Important: double check the isReleased state inside synchronized block, so there is no // race condition when notifyBufferAvailable and releaseAllResources running in parallel. - if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + if (isReleased.get() || isBlocked() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { --- End diff -- shouldn't we accumulate/remember the count of "blocked" credits and restore/reassign them upon unblocking the channel? Doesn't the current code "leak" the credit? Or is there some other code, that will restore the credits balance (both exclusive/floating point buffers). > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346431#comment-16346431 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164975471 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java --- @@ -165,6 +168,14 @@ protected void notifyChannelNonEmpty() { */ abstract void releaseAllResources() throws IOException; + protected boolean isBlocked() { --- End diff -- If you move implementation of `isBlocked`/`setBlocked` methods to the `RemoteInputChannel` synchronisation can be optimised - instead of using another synchronisation method in form of `volatile`, `RemoteInputChannel` could use already existing `synchronized (bufferQueue)` for this purpose. As it is now, this `volatile` `isBlocked` adds an additional overhead which could be avoided. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346433#comment-16346433 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164977052 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -485,6 +494,23 @@ public void requestPartitions() throws IOException, InterruptedException { } } + @Override + public void blockInputChannel(int channelIndex) { + InputChannel inputChannel = indexToInputChannelMap.get(channelIndex); + if (inputChannel == null) { + throw new IllegalStateException("Could not find input channel from the channel index " + channelIndex); --- End diff -- `IllegalArgumentException`? > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346434#comment-16346434 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164972989 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -133,8 +134,13 @@ * Input channels. There is a one input channel for each consumed intermediate result partition. * We store this in a map for runtime updates of single channels. */ + @GuardedBy("requestLock") private final MapinputChannels; + /** A mapping from internal channel index in this gate to input channel. */ + @GuardedBy("requestLock") + private final Map indexToInputChannelMap; --- End diff -- nit: maybe rename to `channelIndexToInputChannel` and drop the comment and maybe drop the comment? It doesn't add much value. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346430#comment-16346430 ] ASF GitHub Bot commented on FLINK-8523: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5381#discussion_r164975820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -315,6 +322,7 @@ public void returnExclusiveSegments(List segments) throws IOExcep public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) { synchronized (requestLock) { if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null + && indexToInputChannelMap.put(inputChannel.getChannelIndex(), inputChannel) == null --- End diff -- Please extract `inputChannels.put(...)` and `indexToInputChannelMap.put(...)` to separate method wrapping those calls - this logic is duplicated with `updateInputChannel(...)`. Also change it to something like this: ``` private boolean addInputChannel(inputChannel, partitionId) { boolean alreadyInInputChannels = inputChannels.put(...) != null; boolean alreadyInIndexesMapping = indexToInputChannelMap.put(...) != null; checkState(alreadyInInputChannels ^ alreadyInIndexesMapping, "Inconsistent internal state (...)"); return alreadyInInputChannels; } ``` > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344746#comment-16344746 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5381 There is one issue I am thinking. After the input channel is unblocked, we can check whether this input channel needs request floating buffer from pool (if `availableNum < numRequiredBuffers` and `!isWaitingForFloatingBuffers`). Otherwise the request will be triggered by `onSenderBacklog` next time. What do you think is better? > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344670#comment-16344670 ] ASF GitHub Bot commented on FLINK-8523: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5381 @pnowojski I have submitted the whole process in one commit. After you verify the implementation is feasible, I will submit a separate commit for adding unit tests based on this process. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344611#comment-16344611 ] ASF GitHub Bot commented on FLINK-8523: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/5381 [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode ## What is the purpose of the change In exactly-once mode, the input channel is set blocked state when reading barrier from it. And the blocked state will be released after barrier alignment or cancelled. In credit-based network flow control, we should avoid assigning floating buffers for blocked input channels because the buffers after barrier will not be processed by operator until alignment. To do so, we can fully make use of floating buffers and speed up barrier alignment in some extent. ## Brief change log - *Add `blockInputChannel` and `releaseBlockedInputChannels` in `InputGate` interface* - *`UnionInputGate` constructs the mapping from channel index to `InputGate`* - *`SingleInputGate` constructs the mapping from channel index to `InputChannel`* - *`BarrierBuffer` determines the logic of blocking input channel or releasing it* - *Avoid assigning floating buffers for blocked input channels* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests that validates the floating buffers are not assigned for blocked input channels* - *Added unit test that validates the `BarrierBuffer` blocks or releases the `InputChannel` correctly* - *Added unit test that validates the `UnionInputGate` and `SingleInputGate` constructs the mapping correctly* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-8523 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5381.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5381 commit 8c7ef5a22c348c3a6eedb04708b00093ae666e37 Author: ZhijiangDate: 2018-01-30T07:14:20Z [FLINK-8523][network] Stop assigning floating buffers for blocked input channels in exactly-once mode > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)