[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4368 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131334260 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131332988 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) --- End diff -- it doesn't need to be serializable anymore! dropped this mention :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131330545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -51,39 +48,52 @@ * @param Transaction to store all of the information required to handle a transaction (must be Serializable) */ @PublicEvolving -public abstract class TwoPhaseCommitSinkFunction+public abstract class TwoPhaseCommitSinkFunction extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); - protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; protected final ListStateDescriptor pendingTransactionsDescriptor; - protected final List pendingCommitTransactions = new ArrayList<>(); + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); @Nullable protected TXN currentTransaction; protected ListState pendingTransactionsState; - protected ListState pendingCommitTransactionsState; - - public TwoPhaseCommitSinkFunction(Class txnClass) { - this( - TypeInformation.of(txnClass), - TypeInformation.of(new TypeHint () {})); - } + protected ListState > pendingCommitTransactionsState; --- End diff -- Whether transactions can be redistributed depends on the system that we communicate with. For Kafka 0.11 I'm not sure. Now when I think about it a little bit more, I will change it to `ListState >>`, so that we can guarantee that transactions will be recovered in the same order they were created. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131094885 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState > pendingCommitTransactionsState; + + /** +* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this +* constructor are {@link TypeInformation#of(Class)}, {@link org.apache.flink.api.common.typeinfo.TypeHint} and +* {@link TypeInformation#of(TypeHint)}. Example: +* +* {@code +* TwoPhaseCommitSinkFunction( +* TypeInformation.of(TXN.class), +* TypeInformation.of(new TypeHint () {})); +* } +* +* @param txnTypeInformation {@link TypeInformation} for transaction POJO. +* @param checkpointToTxnTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. +*/ + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation > checkpointToTxnTypeInformation) { + this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", checkpointToTxnTypeInformation)); + } + + /** +* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors. +* +* @param pendingTransactionsDescriptor descriptor for transaction POJO. +* @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO. +*/
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131093966 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState > pendingCommitTransactionsState; + + /** +* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this +* constructor are {@link TypeInformation#of(Class)}, {@link org.apache.flink.api.common.typeinfo.TypeHint} and +* {@link TypeInformation#of(TypeHint)}. Example: +* +* {@code +* TwoPhaseCommitSinkFunction( +* TypeInformation.of(TXN.class), +* TypeInformation.of(new TypeHint () {})); +* } +* +* @param txnTypeInformation {@link TypeInformation} for transaction POJO. +* @param checkpointToTxnTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. +*/ + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation > checkpointToTxnTypeInformation) { + this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", checkpointToTxnTypeInformation)); + } + + /** +* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors. +* +* @param pendingTransactionsDescriptor descriptor for transaction POJO. +* @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO. +*/
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131091039 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods --- End diff -- nit: wrap TXN as `{@code TXN}`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131093086 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState > pendingCommitTransactionsState; + + /** +* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this +* constructor are {@link TypeInformation#of(Class)}, {@link org.apache.flink.api.common.typeinfo.TypeHint} and +* {@link TypeInformation#of(TypeHint)}. Example: +* +* {@code +* TwoPhaseCommitSinkFunction( +* TypeInformation.of(TXN.class), +* TypeInformation.of(new TypeHint () {})); +* } +* +* @param txnTypeInformation {@link TypeInformation} for transaction POJO. +* @param checkpointToTxnTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. +*/ + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation > checkpointToTxnTypeInformation) { + this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", checkpointToTxnTypeInformation)); + } + + /** +* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors. +* +* @param pendingTransactionsDescriptor descriptor for transaction POJO. +* @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO. +*/
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131094983 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState > pendingCommitTransactionsState; + + /** +* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this +* constructor are {@link TypeInformation#of(Class)}, {@link org.apache.flink.api.common.typeinfo.TypeHint} and +* {@link TypeInformation#of(TypeHint)}. Example: +* +* {@code +* TwoPhaseCommitSinkFunction( +* TypeInformation.of(TXN.class), +* TypeInformation.of(new TypeHint () {})); +* } +* +* @param txnTypeInformation {@link TypeInformation} for transaction POJO. +* @param checkpointToTxnTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. +*/ + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation > checkpointToTxnTypeInformation) { + this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", checkpointToTxnTypeInformation)); + } + + /** +* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors. +* +* @param pendingTransactionsDescriptor descriptor for transaction POJO. +* @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO. +*/
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131094488 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState > pendingCommitTransactionsState; + + /** +* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this +* constructor are {@link TypeInformation#of(Class)}, {@link org.apache.flink.api.common.typeinfo.TypeHint} and +* {@link TypeInformation#of(TypeHint)}. Example: +* +* {@code +* TwoPhaseCommitSinkFunction( +* TypeInformation.of(TXN.class), +* TypeInformation.of(new TypeHint () {})); +* } +* +* @param txnTypeInformation {@link TypeInformation} for transaction POJO. +* @param checkpointToTxnTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. +*/ + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation > checkpointToTxnTypeInformation) { + this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", checkpointToTxnTypeInformation)); + } + + /** +* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors. +* +* @param pendingTransactionsDescriptor descriptor for transaction POJO. +* @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO. +*/
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131091549 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) --- End diff -- nit: either wrap `Serializable` with `@link` or lower-case s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131070533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -51,39 +48,52 @@ * @param Transaction to store all of the information required to handle a transaction (must be Serializable) */ @PublicEvolving -public abstract class TwoPhaseCommitSinkFunction+public abstract class TwoPhaseCommitSinkFunction extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); - protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor > pendingCommitTransactionsDescriptor; protected final ListStateDescriptor pendingTransactionsDescriptor; - protected final List pendingCommitTransactions = new ArrayList<>(); + protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); @Nullable protected TXN currentTransaction; protected ListState pendingTransactionsState; - protected ListState pendingCommitTransactionsState; - - public TwoPhaseCommitSinkFunction(Class txnClass) { - this( - TypeInformation.of(txnClass), - TypeInformation.of(new TypeHint () {})); - } + protected ListState > pendingCommitTransactionsState; --- End diff -- I think this has to be `ListState >` or the original `ListState `. Using a map instead of a list for `pendingCommitTransactions ` is ok for bookkeeping, but when snapshotting this map we need to make sure the snapshotted transactions are state elements that can be redistributable independent of each other. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129531190 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129531220 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129527454 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129581412 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129522314 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { --- End diff -- As you wish, I have no experience with serialization in Java, so I will leave this decision up to you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129523821 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction--- End diff -- I think that was a relict from a previous version. Dropped. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129340541 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129339683 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction--- End diff -- Why does `TXN` need to be `Serializable`? It should always be serialised using Flink mechanics (`TypeSerializer` and so on). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129340400 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129234261 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129234765 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129235825 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129235807 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129234639 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129234789 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129235367 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129234336 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129233893 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { --- End diff -- I'm not too sure we actually want to expose this specific constructor. I'm not really fond of the fact that the user leaves the decision of how to serialize the state completely to this base class. One disadvantage I can think immediately of that is that the `TransactionAndCheckpoint` is not a POJO, and therefore Flink will try to serialize it using Kryo into state. In general, it is a bad idea to use Kryo for these kinds of persisted data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129234426 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129233968 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( --- End diff -- Could we have Javadocs for these constructors? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r129217538 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * This is a recommended base class for all of the {@link SinkFunction} that intend to implement exactly-once semantic. + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction} and + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and implement abstract methods + * handling this transaction handle. + * + * @param Input type for {@link SinkFunction} + * @param Transaction to store all of the information required to handle a transaction (must be Serializable) + */ +@PublicEvolving +public abstract class TwoPhaseCommitSinkFunction+ extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + + protected final ListStateDescriptor pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor pendingTransactionsDescriptor; + + protected final List pendingCommitTransactions = new ArrayList<>(); + + @Nullable + protected TXN currentTransaction; + protected ListState pendingTransactionsState; + protected ListState pendingCommitTransactionsState; + + public TwoPhaseCommitSinkFunction(Class txnClass) { + this( + TypeInformation.of(txnClass), + TypeInformation.of(new TypeHint () {})); + } + + public TwoPhaseCommitSinkFunction( + TypeInformation txnTypeInformation, + TypeInformation txnAndCheckpointTypeInformation) { + this( + new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), + new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation)); + } + + public TwoPhaseCommitSinkFunction( + ListStateDescriptor pendingTransactionsDescriptor, + ListStateDescriptor pendingCommitTransactionsDescriptor) { + this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); + this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); + } + + // -- methods that should be implemented in child class to support two phase
[GitHub] flink pull request #4368: [FLINK-7210] Introduce TwoPhaseCommitSinkFunction
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4368 [FLINK-7210] Introduce TwoPhaseCommitSinkFunction This is intended to be a recommended base class for implementing exactly-once sinks in Flink You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink 2phase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4368 commit 6ea314a4abd9f609accbc9c5f450051560df43da Author: Piotr NowojskiDate: 2017-07-04T15:45:53Z [FLINK-7210] Implement TwoPhaseCommitSinkFunction This is a recommended base class for implementing exactly-once sinks in Flink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---