[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372804#comment-16372804
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5400


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368912#comment-16368912
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , I have changed the `EXACTLY_ONCE_BLOCKING_DATA_ENABLED` as 
true and squashed the commits.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362335#comment-16362335
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
Thanks for rebasing the conflicts.

Yes, the default value can be changed to true after the credit-based is 
totally merged. If need any changes on my side after all, pls let me know. :)


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360864#comment-16360864
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , I have submitted the updates for above comments.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360850#comment-16360850
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167582971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
--- End diff --

sure


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360848#comment-16360848
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167582763
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
+* @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+*/
+   BufferOrEventSequence rollOver(boolean newBuffer) throws IOException;
--- End diff --

sure


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360849#comment-16360849
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167582882
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
  */
-public class BarrierBufferTest {
-
-   private static final Random RND = new Random();
-
-   private static final int PAGE_SIZE = 512;
-
-   private static int sizeCounter = 0;
+public class BarrierBufferTest extends BarrierBufferTestBase {
--- End diff --

make sense


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360763#comment-16360763
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167537474
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
--- End diff --

Java doc in this interface shouldn't mention implementation specific 
details. On the other hand, this java doc doesn't explain what `newBuffer` is 
doing and for this information one must check the `BufferSpiller`'s java doc 
itself.

Can you add appropriate java doc here, or better add java doc to proposed 
in the comment below two methods:  `rollOverWithoutReusingResources()` and 
`rollOverReusingResources()`. Comment in 
`CachedBufferBlocker.java#rollOverReusingResources` should state that it is 
never reusing resources and is defaulting to 
`CachedBufferBlocker.java#rollOverWithoutReusingResources`


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360765#comment-16360765
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167535311
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
--- End diff --

nit: Missing period in java doc (build failure).


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360762#comment-16360762
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167557594
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
  */
-public class BarrierBufferTest {
-
-   private static final Random RND = new Random();
-
-   private static final int PAGE_SIZE = 512;
-
-   private static int sizeCounter = 0;
+public class BarrierBufferTest extends BarrierBufferTestBase {
--- End diff --

Rename the test class name to `SpillingBarrierBufferTest`?


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360761#comment-16360761
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167537059
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
+* @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+*/
+   BufferOrEventSequence rollOver(boolean newBuffer) throws IOException;
--- End diff --

Could we stick with two methods in the interface? I think more descriptive 
names will be better compared to parameter here: 
`rollOverWithoutReusingResources()` and `rollOverReusingResources()`, where: 
`rollOverWithoutReusingResources` == `rollOver(true)`.

Especially if one implementation doesn't support one of those calls.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360764#comment-16360764
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167535452
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BufferBlocker}.
+ */
+public abstract class BufferBlockerTestBase {
+
+   protected static final int PAGE_SIZE = 4096;
+
+   abstract BufferBlocker createBufferBlocker();
+
+   @Test
+   public void testRollOverEmptySequences() throws IOException {
+   BufferBlocker bufferBlocker = createBufferBlocker();
+   assertNull(bufferBlocker.rollOver(false));
+   assertNull(bufferBlocker.rollOver(false));
+   assertNull(bufferBlocker.rollOver(false));
+   }
+
+   @Test
+   public void testSpillAndRollOverSimple() throws IOException {
+   final Random rnd = new Random();
+   final Random bufferRnd = new Random();
+
+   final int maxNumEventsAndBuffers = 3000;
+   final int maxNumChannels = 1656;
+
+   BufferBlocker bufferBlocker = createBufferBlocker();
+
+   // do multiple spilling / rolling over rounds
+   for (int round = 0; round < 5; round++) {
+
+   final long bufferSeed = rnd.nextLong();
+   bufferRnd.setSeed(bufferSeed);
+
+   final int numEventsAndBuffers = 
rnd.nextInt(maxNumEventsAndBuffers) + 1;
+   final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+
+   final ArrayList events = new 
ArrayList(128);
+
+   // generate sequence
+   for (int i = 0; i < numEventsAndBuffers; i++) {
+   boolean isEvent = rnd.nextDouble() < 0.05d;
+   BufferOrEvent evt;
+   if (isEvent) {
+   evt = generateRandomEvent(rnd, 
numChannels);
+   events.add(evt);
+   } else {
+   evt = 
generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, 
bufferRnd.nextInt(numChannels));
+   }
+   bufferBlocker.add(evt);
+   }
+
+   // reset and create reader
+   bufferRnd.setSeed(bufferSeed);
+
+   BufferOrEventSequence seq = 
bufferBlocker.rollOver(false);
+   seq.open();
+
+   // read and validate the sequence
+
+   int numEvent = 0;
+   for (int i = 0; i < numEventsAndBuffers; i++) {
+   BufferOrEvent next = seq.getNext();
   

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358151#comment-16358151
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for suggestions and I totally agree with that. 
That abstraction indeed makes the code simple.  I will update the codes 
ASAP.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358128#comment-16358128
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167163798
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class containing common methods for testing
+ * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
+ */
+public class BarrierBufferTestBase {
+
+   private static final Random RND = new Random();
+
+   private static int sizeCounter = 1;
+
+   public static BufferOrEvent createBarrier(long checkpointId, int 
channel) {
+   return new BufferOrEvent(new CheckpointBarrier(
+   checkpointId, System.currentTimeMillis(), 
CheckpointOptions.forCheckpointWithDefaultLocation()), channel);
+   }
+
+   public static BufferOrEvent createCancellationBarrier(long 
checkpointId, int channel) {
--- End diff --

Instead of using static methods please use inheritance - make 
`BarrierBufferTest` and `CreditBasedBarrierBufferTest` extend 
`BarrierBufferTestBase`. Especially that name `*Base` already suggests that.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This 

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358129#comment-16358129
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167162743
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class containing common methods for testing
+ * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
+ */
+public class BarrierBufferTestBase {
--- End diff --

This is not exactly what I had in mind by deduplication of 
`BarrierBufferTest` and `CreditBasedBarrierBufferTest`.  Both of those tests 
are still pretty much copy of one another and those static methods are only a 
fraction of duplication.

Look for example at the `testSingleChannelNoBarriers()` they are 99% 
identical. All of it's code could be moved to `BarrierBufferTestBase`. 
`BarrierBufferTestBase` would only need to define abstract method 
`CheckpointBarrierHandler createBarrierHandler()` which would be define 
differently in `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. One 
minor thing is that `BarrierBufferTest` would need `checkNoTempFilesRemain()` 
added as an `@After` test hook. Same applies to all of the other tests.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the 

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355191#comment-16355191
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , I have submitted a separate commit to address above comments.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353271#comment-16353271
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166175837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

I think we can change the current `CheckpointBarrierHandler` interface into 
abstract class and then add a `createBarrierHanlder` method for extracting the 
common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we 
define a new class for the common method. I prefer the first way. 
What do you think?


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353265#comment-16353265
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166174743
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

I think we can not directly mix all the blocked buffers for different 
checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` 
which indicates the blocked buffers for a specific checkpoint id, otherwise we 
can not know when the blocked buffers are exhausted after reset a specific 
checkpoint id. 

If we want to use only one `ArrayDeque` for blocking all buffers, we may 
need to insert extra hints of checkpoint id into this queue for helping when to 
stop reading blocked buffers from the queue.

For example:
channel1: [cp1,cp2,b1,cp3,b2,b3]
channel2: [cp2]

1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as 
separate sequence1.

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352520#comment-16352520
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for reviews!

I understand your concerns and I should deduplicate some common utils in 
these tests. I will do that tomorrow together with other comments!


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352509#comment-16352509
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165998584
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

yes, i will consider a proper way


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352505#comment-16352505
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165997853
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

The current implementation keeps the same logic with `BarrierBuffer`. I am 
wondering whether it can make sense if only keeping one 
`ArrayDeque` for holding all blocking buffers for different 
checkpoint ids. Especially for the uncommon case mentioned on line 496 in 
`BarrierBuffer`. I will double check that logic and reply to you later.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue 

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352441#comment-16352441
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983714
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
--- End diff --

sure


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352436#comment-16352436
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983496
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -184,6 +184,18 @@
key("taskmanager.network.detailed-metrics")
.defaultValue(false);
 
+   /**
+* Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
+* mode based on credit-based flow control.
+*
+* @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
+* credit-based flow control.
+*/
+   @Deprecated
+   public static final ConfigOption 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
+   key("taskmanager.exactly-once.blocking.data.enabled")
+   .defaultValue(false);
--- End diff --

yes, the default value should be true, but I think it should be changed 
after the `FLINK-7456` is merged to make the credit-based work.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352438#comment-16352438
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983607
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
--- End diff --

the checkstyle failures are fixed


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352280#comment-16352280
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165942614
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

Do we need this `queuedBuffered` and `currentBuffered` fields with 
`CreditBasedBufferBlocker`? Why can not we just use `ArrayDeque 
currentBuffers` field from `CreditBasedBufferBlocker` for this? Why do we need 
this triple level buffering here? In original code it made sense, since instead 
of `CreditBasedBufferBlocker` there was a `BufferSpiller`.

Getting rid of those three fields would vastly simplify this class.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: 

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352284#comment-16352284
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165930338
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -184,6 +184,18 @@
key("taskmanager.network.detailed-metrics")
.defaultValue(false);
 
+   /**
+* Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
+* mode based on credit-based flow control.
+*
+* @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
+* credit-based flow control.
+*/
+   @Deprecated
+   public static final ConfigOption 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
+   key("taskmanager.exactly-once.blocking.data.enabled")
+   .defaultValue(false);
--- End diff --

I think we would like to enable it by default and leave this config option 
just as a safety net in case of bugs/problems.

btw, shouldn't this be tightly coupled with a credit based flow switch?


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352282#comment-16352282
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165945174
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTest.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CreditBasedBufferBlocker}.
+ */
+public class BufferBlockerTest {
+
+   private static final int PAGE_SIZE = 4096;
+
+   private CreditBasedBufferBlocker bufferBlocker;
+
+
+   // 

+   //  Setup / Cleanup
+   // 

+
+   @Before
+   public void createSpiller() {
+   bufferBlocker = new CreditBasedBufferBlocker(PAGE_SIZE);
+   }
+
+   @After
+   public void cleanupSpiller() {
+   if (bufferBlocker != null) {
+   bufferBlocker.close();
+   }
+   }
+
+   // 

+   //  Tests
+   // 

+
+   @Test
+   public void testRollOverEmptySequences() {
+   assertNull(bufferBlocker.rollOver());
+   assertNull(bufferBlocker.rollOver());
+   assertNull(bufferBlocker.rollOver());
+   }
+
+   @Test
+   public void testSpillAndRollOverSimple() {
+   final Random rnd = new Random();
+   final Random bufferRnd = new Random();
+
+   final int maxNumEventsAndBuffers = 3000;
+   final int maxNumChannels = 1656;
+
+   // do multiple blocking / rolling over rounds
+   for (int round = 0; round < 5; round++) {
--- End diff --

Can you deduplicate the code of those two unit tests 
(`testSpillAndRollOverSimple` and `testSpillWhileReading`)? It seems like this 
one is just a one sequence of the next one?


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which 

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352283#comment-16352283
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165943880
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTest.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CreditBasedBufferBlocker}.
+ */
+public class BufferBlockerTest {
--- End diff --

Rename class to `CreditBasedBufferBlockerTest`


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352279#comment-16352279
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165933824
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
--- End diff --

Please explain a little bit more `It will not cause deadlocks based on 
credit-based flow control` part in the comment.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352281#comment-16352281
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165943080
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

Please extract this and the same code from `StreamTwoInputProcessor.java` 
into a common method. I think all of the lines upto `this.lock = 
checkNotNull(lock);` could be unified. Maybe into some base class.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352278#comment-16352278
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165930413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
--- End diff --

nit: There were some checkstyle failures


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349921#comment-16349921
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5400

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once

## What is the purpose of the change

*Currently in exactly-once mode, the BarrierBuffer would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the BarrierBuffer has to spill the data in disk files to recycle the buffers 
for blocked channels.*

*Based on credit-based flow control, every channel has exclusive buffers, 
so it is no need to spill data for avoiding deadlock. Then we implement a new 
CheckpointBarrierHandler for only buffering the data for blocked channels for 
better performance.*

*And this new CheckpointBarrierHandler can also be configured to use or not 
in order to rollback the original mode for unexpected risks.*

## Brief change log

  - *Implement the new `CreditBasedBarrierBuffer` and 
`CreditBasedBufferBlocker` for buffering data in blocked channels in 
exactly-once mode.*
  - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` 
for enabling the new handler or not.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests for the logic of `CreditBasedBarrierBuffer`*
  - *Added tests for the logic of `CreditBasedBufferBlocker`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8547

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5400


commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe
Author: Zhijiang 
Date:   2018-02-02T07:45:49Z

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once




> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)