[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488870#comment-16488870
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Closed in e1d1234477c731fe3f398c7f3f12123f73764242


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488871#comment-16488871
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/6066


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488847#comment-16488847
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Thanks for the review.

Addressing the comments and merging this...


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488794#comment-16488794
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190543815
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static  OperatorChain 
setupOperatorChain(
--- End diff --

This is a lot of mocking, but the alternative approach ties itself not only 
to the internals of the `OperatorChain`, but also to the stream config 
specifics. In that sense, I would like to keep this, because it at least ties 
itself to details one component, rather than two components.

This hints that OperatorChain could really use come refactoring.


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>

[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488679#comment-16488679
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190510791
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -93,6 +93,20 @@
//  state snapshots
// 

 
+   /**
+* This method is called when the operator should do a snapshot, before 
it emits its
+* own checkpoint barrier. This method is intended not for any actual 
state persistence,
+* but only for emitting some data before emitting the checkpoint 
barrier.
+*
+* Important: This method should not be used for any actual 
state snapshot logic, because
+* it will inherently be within the synchronous part of the operator's 
checkpoint. If heavy work is done
+* withing this method, it will affect latency and downstream 
checkpoint alignments.
--- End diff --

typo: withing -> within


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488678#comment-16488678
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190512162
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static  OperatorChain 
setupOperatorChain(
--- End diff --

this is maybe a bit much mocking


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that 

[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487750#comment-16487750
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190342404
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -93,6 +93,20 @@
//  state snapshots
// 

 
+   /**
--- End diff --

This description feels like it states well what it's not intended for, but 
doesn't clearly describe what it's intended for. 

It would be great if you can add what you wrote in the ticket description 
here, as of "Some operators maintain some small transient state ...   Rather 
that persisting that state in a checkpoint, it can make sense to flush the data 
downstream "


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487507#comment-16487507
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/6066

[FLINK-9428] [checkpointing] Allow operators to flush data on checkpoint 
pre-barrier

## What is the purpose of the change

Some operators maintain some small transient state that may be inefficient 
to checkpoint, especially when it would need to be checkpointed also in a 
re-scalable way.
An example are opportunistic pre-aggregation operators, which have small 
the pre-aggregation state that is frequently flushed downstream.

Rather that persisting that state in a checkpoint, it can make sense to 
flush the data downstream upon a checkpoint, to let it be part of the 
downstream operator's state.

This feature is sensitive, because flushing state has a clean implication 
on the downstream operator's checkpoint alignment. However, used with care, and 
with the new back-pressure-based checkpoint alignment, this feature can be very 
useful.

Because it is sensitive, this PR makes this an internal feature (accessible 
to operators) and does NOT expose it in the public API.

## Brief change log

  - Adds the `prepareSnapshotPreBarrier(long checkpointId)` call to 
`(Abstract)StreamOperator`, with an empty default implementation.
  - Adds a call on `OperatorChain` to call this in front-to-back order on 
the operators.

## Verifying this change

  - This change does not yet alter any behavior, it adds only a plug point 
for future stream operators.
  - The `OperatorChainTest` Unit Test validates that the call happens, and 
that operators are called in the right order.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink pre_barrier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6066






> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)