[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4368


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131334260
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 
descriptors.
+*
+* @param pendingTransactionsDescriptor descriptor for transaction POJO.
+* @param pendingCommitTransactionsDescriptor descriptor for mapping 
between checkpointId and transaction POJO.
+*/

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131332988
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
--- End diff --

it doesn't need to be serializable anymore! dropped this mention :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131330545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -51,39 +48,52 @@
  * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
  */
 @PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction
+public abstract class TwoPhaseCommitSinkFunction
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
protected final ListStateDescriptor pendingTransactionsDescriptor;
 
-   protected final List 
pendingCommitTransactions = new ArrayList<>();
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
 
@Nullable
protected TXN currentTransaction;
protected ListState pendingTransactionsState;
-   protected ListState 
pendingCommitTransactionsState;
-
-   public TwoPhaseCommitSinkFunction(Class txnClass) {
-   this(
-   TypeInformation.of(txnClass),
-   TypeInformation.of(new 
TypeHint() {}));
-   }
+   protected ListState> pendingCommitTransactionsState;
--- End diff --

Whether transactions can be redistributed depends on the system that we 
communicate with. For Kafka 0.11 I'm not sure. Now when I think about it a 
little bit more, I will change it to `ListState>>`, so 
that we can guarantee that transactions will be recovered in the same order 
they were created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131094885
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 
descriptors.
+*
+* @param pendingTransactionsDescriptor descriptor for transaction POJO.
+* @param pendingCommitTransactionsDescriptor descriptor for mapping 
between checkpointId and transaction POJO.
+*/

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131093966
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 
descriptors.
+*
+* @param pendingTransactionsDescriptor descriptor for transaction POJO.
+* @param pendingCommitTransactionsDescriptor descriptor for mapping 
between checkpointId and transaction POJO.
+*/

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131091039
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
--- End diff --

nit: wrap TXN as `{@code TXN}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131093086
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 
descriptors.
+*
+* @param pendingTransactionsDescriptor descriptor for transaction POJO.
+* @param pendingCommitTransactionsDescriptor descriptor for mapping 
between checkpointId and transaction POJO.
+*/

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131094983
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 
descriptors.
+*
+* @param pendingTransactionsDescriptor descriptor for transaction POJO.
+* @param pendingCommitTransactionsDescriptor descriptor for mapping 
between checkpointId and transaction POJO.
+*/

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131094488
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState> pendingCommitTransactionsState;
+
+   /**
+* Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
+* constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
+* {@link TypeInformation#of(TypeHint)}. Example:
+* 
+* {@code
+* TwoPhaseCommitSinkFunction(
+* TypeInformation.of(TXN.class),
+* TypeInformation.of(new TypeHint() 
{}));
+* }
+* 
+* @param txnTypeInformation {@link TypeInformation} for transaction 
POJO.
+* @param checkpointToTxnTypeInformation {@link TypeInformation} for 
mapping between checkpointId and transaction.
+*/
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation> 
checkpointToTxnTypeInformation) {
+   this(new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
checkpointToTxnTypeInformation));
+   }
+
+   /**
+* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state 
descriptors.
+*
+* @param pendingTransactionsDescriptor descriptor for transaction POJO.
+* @param pendingCommitTransactionsDescriptor descriptor for mapping 
between checkpointId and transaction POJO.
+*/

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131091549
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
--- End diff --

nit: either wrap `Serializable` with `@link` or lower-case s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-08-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r131070533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -51,39 +48,52 @@
  * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
  */
 @PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction
+public abstract class TwoPhaseCommitSinkFunction
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor> 
pendingCommitTransactionsDescriptor;
protected final ListStateDescriptor pendingTransactionsDescriptor;
 
-   protected final List 
pendingCommitTransactions = new ArrayList<>();
+   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
 
@Nullable
protected TXN currentTransaction;
protected ListState pendingTransactionsState;
-   protected ListState 
pendingCommitTransactionsState;
-
-   public TwoPhaseCommitSinkFunction(Class txnClass) {
-   this(
-   TypeInformation.of(txnClass),
-   TypeInformation.of(new 
TypeHint() {}));
-   }
+   protected ListState> pendingCommitTransactionsState;
--- End diff --

I think this has to be `ListState>` or the original 
`ListState`.

Using a map instead of a list for `pendingCommitTransactions ` is ok for 
bookkeeping, but when snapshotting this map we need to make sure the 
snapshotted transactions are state elements that can be redistributable 
independent of each other. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-26 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129531190
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-26 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129531220
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-26 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129527454
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-26 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129581412
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-26 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129522314
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
--- End diff --

As you wish, I have no experience with serialization in Java, so I will 
leave this decision up to you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-26 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129523821
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
--- End diff --

I think that was a relict from a previous version. Dropped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129340541
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129339683
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
--- End diff --

Why does `TXN` need to be `Serializable`? It should always be serialised 
using Flink mechanics (`TypeSerializer` and so on).  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129340400
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234261
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234765
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129235825
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129235807
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234639
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234789
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129235367
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234336
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129233893
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
--- End diff --

I'm not too sure we actually want to expose this specific constructor.
I'm not really fond of the fact that the user leaves the decision of how to 
serialize the state completely to this base class. One disadvantage I can think 
immediately of that is that the `TransactionAndCheckpoint` is not a POJO, and 
therefore Flink will try to serialize it using Kryo into state. In general, it 
is a bad idea to use Kryo for these kinds of persisted data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129234426
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129233968
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
--- End diff --

Could we have Javadocs for these constructors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4368#discussion_r129217538
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   protected final ListStateDescriptor 
pendingCommitTransactionsDescriptor;
+   protected final ListStateDescriptor pendingTransactionsDescriptor;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   @Nullable
+   protected TXN currentTransaction;
+   protected ListState pendingTransactionsState;
+   protected ListState 
pendingCommitTransactionsState;
+
+   public TwoPhaseCommitSinkFunction(Class txnClass) {
+   this(
+   TypeInformation.of(txnClass),
+   TypeInformation.of(new 
TypeHint() {}));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   TypeInformation txnTypeInformation,
+   TypeInformation 
txnAndCheckpointTypeInformation) {
+   this(
+   new ListStateDescriptor<>("pendingTransactions", 
txnTypeInformation),
+   new ListStateDescriptor<>("pendingCommitTransactions", 
txnAndCheckpointTypeInformation));
+   }
+
+   public TwoPhaseCommitSinkFunction(
+   ListStateDescriptor pendingTransactionsDescriptor,
+   ListStateDescriptor 
pendingCommitTransactionsDescriptor) {
+   this.pendingTransactionsDescriptor = 
requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is 
null");
+   this.pendingCommitTransactionsDescriptor = 
requireNonNull(pendingCommitTransactionsDescriptor, 
"pendingCommitTransactionsDescriptor is null");
+   }
+
+   // -- methods that should be implemented in child class to support 
two phase 

[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction

2017-07-19 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4368

[FLINK-7210] Introduce TwoPhaseCommitSinkFunction

This is intended to be a recommended base class for implementing 
exactly-once sinks in Flink

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink 2phase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4368


commit 6ea314a4abd9f609accbc9c5f450051560df43da
Author: Piotr Nowojski 
Date:   2017-07-04T15:45:53Z

[FLINK-7210] Implement TwoPhaseCommitSinkFunction

This is a recommended base class for implementing exactly-once sinks in 
Flink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---