[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116184#comment-16116184
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4368
  
Thanks! :)


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116166#comment-16116166
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4368


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116064#comment-16116064
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4368
  
Alright, this looks good to merge now!
Merging ..


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114096#comment-16114096
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131334260
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114086#comment-16114086
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131332988
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
--- End diff --

it doesn't need to be serializable anymore! dropped this mention :)


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114066#comment-16114066
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131330545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -51,39 +48,52 @@
  * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
  */
 @PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction
+public abstract class TwoPhaseCommitSinkFunction
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
protected final ListStateDescriptor pendingTransactionsDescriptor;
 
-   protected final List 
pendingCommitTransactions = new ArrayList<>();
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
 
@Nullable
protected TXN currentTransaction;
protected ListState pendingTransactionsState;
-   protected ListState 
pendingCommitTransactionsState;
-
-   public TwoPhaseCommitSinkFunction(Class txnClass) {
-   this(
-   TypeInformation.of(txnClass),
-   TypeInformation.of(new 
TypeHint() {}));
-   }
+   protected ListState> pendingCommitTransactionsState;
--- End diff --

Whether transactions can be redistributed depends on the system that we 
communicate with. For Kafka 0.11 I'm not sure. Now when I think about it a 
little bit more, I will change it to `ListState>>`, so 
that we can guarantee that transactions will be recovered in the same order 
they were created.


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112495#comment-16112495
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131093966
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112492#comment-16112492
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131093086
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112491#comment-16112491
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131091039
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
--- End diff --

nit: wrap TXN as `{@code TXN}`.


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112496#comment-16112496
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131094885
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112494#comment-16112494
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131094488
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112490#comment-16112490
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131091549
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
--- End diff --

nit: either wrap `Serializable` with `@link` or lower-case s.


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112493#comment-16112493
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131094983
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112340#comment-16112340
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131070533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -51,39 +48,52 @@
  * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
  */
 @PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction
+public abstract class TwoPhaseCommitSinkFunction
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
protected final ListStateDescriptor pendingTransactionsDescriptor;
 
-   protected final List 
pendingCommitTransactions = new ArrayList<>();
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
 
@Nullable
protected TXN currentTransaction;
protected ListState pendingTransactionsState;
-   protected ListState 
pendingCommitTransactionsState;
-
-   public TwoPhaseCommitSinkFunction(Class txnClass) {
-   this(
-   TypeInformation.of(txnClass),
-   TypeInformation.of(new 
TypeHint() {}));
-   }
+   protected ListState> pendingCommitTransactionsState;
--- End diff --

I think this has to be `ListState>` or the original 
`ListState`.

Using a map instead of a list for `pendingCommitTransactions ` is ok for 
bookkeeping, but when snapshotting this map we need to make sure the 
snapshotted transactions are state elements that can be redistributable 
independent of each other. 


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112319#comment-16112319
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4368
  
I would also like another look a bit later today. Can merge after that :)


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112317#comment-16112317
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4368
  
@pnowojski The changes look good! I'll have another look at the whole thing 
before merging? Or maybe @tzulitai wants to do that?  


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101790#comment-16101790
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129527454
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101793#comment-16101793
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129522314
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
--- End diff --

As you wish, I have no experience with serialization in Java, so I will 
leave this decision up to you.


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101794#comment-16101794
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129523821
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
--- End diff --

I think that was a relict from a previous version. Dropped.


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101791#comment-16101791
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129581412
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101789#comment-16101789
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129531190
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101792#comment-16101792
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129531220
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100205#comment-16100205
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129340541
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100204#comment-16100204
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129339683
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
--- End diff --

Why does `TXN` need to be `Serializable`? It should always be serialised 
using Flink mechanics (`TypeSerializer` and so on).  


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100203#comment-16100203
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129340400
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099664#comment-16099664
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234426
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099669#comment-16099669
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129233968
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
--- End diff --

Could we have Javadocs for these constructors?


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099673#comment-16099673
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129235807
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099666#comment-16099666
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234336
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099670#comment-16099670
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234765
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099668#comment-16099668
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234639
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099667#comment-16099667
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234789
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099672#comment-16099672
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129235825
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099674#comment-16099674
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129235367
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099675#comment-16099675
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234261
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099665#comment-16099665
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129233893
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
--- End diff --

I'm not too sure we actually want to expose this specific constructor.
I'm not really fond of the fact that the user leaves the decision of how to 
serialize the state completely to this base class. One disadvantage I can think 
immediately of that is that the `TransactionAndCheckpoint` is not a POJO, and 
therefore Flink will try to serialize it using Kryo into state. In general, it 
is a bad idea to use Kryo for these kinds of persisted data.


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> 

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099671#comment-16099671
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129217538
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   

[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16099610#comment-16099610
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4368
  
@EronWright, there shouldn't be any difficulties with that. I think that 
there is only one functional difference between `TwoPhaseCommitSourceFunction` 
and `PravegaWriter` - the first one automatically abort some (not all) of the 
"dangling" transactions after restoring the state.



> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093178#comment-16093178
 ] 

ASF GitHub Bot commented on FLINK-7210:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4368

[FLINK-7210] Introduce TwoPhaseCommitSinkFunction

This is intended to be a recommended base class for implementing 
exactly-once sinks in Flink

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink 2phase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4368


commit 6ea314a4abd9f609accbc9c5f450051560df43da
Author: Piotr Nowojski 
Date:   2017-07-04T15:45:53Z

[FLINK-7210] Implement TwoPhaseCommitSinkFunction

This is a recommended base class for implementing exactly-once sinks in 
Flink




> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> --
>
> Key: FLINK-7210
> URL: https://issues.apache.org/jira/browse/FLINK-7210
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)