Btw here is the checkpoint related log

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
Checkpoint was declined.
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out
(1/1)#0. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
Caused by: org.apache.flink.util.SerializedThrowable: npe
at
com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
~[classes/:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
... 20 more
[2021-06-26 16:08:55,357] INFO Checkpoint 1 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
output
[2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
33
34
35
[2021-06-26 16:09:15,349] INFO Checkpoint 2 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)

On Sat, Jun 26, 2021 at 4:36 PM tao xiao <xiaotao...@gmail.com> wrote:

> Hi team,
>
> I run a simple 1.12.1 Flink job in IDE with
> TolerableCheckpointFailureNumber set where I throw an exception in source
> function snapshotState intentionally to verify how Flink behaves. What I
> find is the first checkpoint throws the exception and eventually time out
> while the main flow continues to work. This is expected however all
> subsequent checkpoints don't reach the exception anymore and report timeout
> when timeout reaches. I want to know if this is expected behavior which all
> later checkpoints cannot finish if there is one checkpoint that throws
> exception.
>
> Below is the code the reproduce the behavior
> main
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
> env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(3_000);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);
>
> env.addSource(new FromElementsFunctionT())
>     .setParallelism(1)
>     .print()
>     .setParallelism(1);
> env.execute("Demo");
>
>
> Source function
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *    http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package sample.flink;
>
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.annotation.PublicEvolving;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.typeutils.base.IntSerializer;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Preconditions;
>
> /**
>  * A stream source function that returns a sequence of elements.
>  *
>  * <p>Upon construction, this source function serializes the elements using 
> Flink's type
>  * information. That way, any object transport using Java serialization will 
> not be affected by the
>  * serializability of the elements.
>  *
>  * <p><b>NOTE:</b> This source has a parallelism of 1.
>  *
>  */
> @PublicEvolving
> public class FromElementsFunctionT implements SourceFunction<Integer>, 
> CheckpointedFunction {
>
>     private static final long serialVersionUID = 1L;
>
>     /** The number of elements emitted already. */
>     private volatile int numElementsEmitted;
>
>     /** Flag to make the source cancelable. */
>     private volatile boolean isRunning = true;
>
>     private transient ListState<Integer> checkpointedState;
>
>     @Override
>     public void initializeState(FunctionInitializationContext context) throws 
> Exception {
>         Preconditions.checkState(
>                 this.checkpointedState == null,
>                 "The " + getClass().getSimpleName() + " has already been 
> initialized.");
>
>         this.checkpointedState =
>                 context.getOperatorStateStore()
>                         .getListState(
>                                 new ListStateDescriptor<>(
>                                         "from-elements-state", 
> IntSerializer.INSTANCE));
>
>         if (context.isRestored()) {
>             List<Integer> retrievedStates = new ArrayList<>();
>             for (Integer entry : this.checkpointedState.get()) {
>                 retrievedStates.add(entry);
>             }
>
>             // given that the parallelism of the function is 1, we can only 
> have 1 state
>             Preconditions.checkArgument(
>                     retrievedStates.size() == 1,
>                     getClass().getSimpleName() + " retrieved invalid state.");
>
>             this.numElementsEmitted = retrievedStates.get(0);
>         }
>     }
>
>     @Override
>     public void run(SourceContext<Integer> ctx) throws Exception {
>         final Object lock = ctx.getCheckpointLock();
>
>         while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
>             Thread.sleep(1000);
>             synchronized (lock) {
>                 ctx.collect(numElementsEmitted++);
>             }
>         }
>     }
>
>     @Override
>     public void cancel() {
>         isRunning = false;
>     }
>
>     // 
> ------------------------------------------------------------------------
>     //  Checkpointing
>     // 
> ------------------------------------------------------------------------
>
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>         Preconditions.checkState(
>                 this.checkpointedState != null,
>                 "The " + getClass().getSimpleName() + " has not been properly 
> initialized.");
>
>         this.checkpointedState.clear();
>         this.checkpointedState.add(this.numElementsEmitted);
>         throw new NullPointerException("npe");
>     }
> }
>
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao

Reply via email to