Thanks for the pointer. let me try upgrading the flink On Thu, Jul 1, 2021 at 5:29 PM Yun Tang <myas...@live.com> wrote:
> Hi Tao, > > I run your program with Flink-1.12.1 and found the problem you described > really existed. And things would go normal if switching to Flink-1.12.2 > version. > > After dig into the root cause, I found this is caused by a fixed bug > [1]: If a legacy source task fails outside of the legacy thread, the legacy > thread blocks proper cancellation (completion future never completed). As > you throw the NPE within the source operator, it will never exit and cannot > handle subsequent checkpoint requirements then. That's why you see all > subsequent checkpoints cannot finish. > > > [1] > https://github.com/apache/flink/commit/b332ce40d88be84d9cf896f446c7c6e26dbc8b6a#diff-54e9ce3b15d6badcc9376ab144df066eb46c4e516d6ee31ef8eb38e2d4359042 > > Best > Yun Tang > ------------------------------ > *From:* Matthias Pohl <matth...@ververica.com> > *Sent:* Thursday, July 1, 2021 16:41 > *To:* tao xiao <xiaotao...@gmail.com> > *Cc:* Yun Tang <myas...@live.com>; user <user@flink.apache.org>; Roman > Khachatryan <ro...@apache.org> > *Subject:* Re: Exception in snapshotState suppresses subsequent > checkpoints > > Hi Tao, > it looks like it should work considering that you have a sleep of 1 second > before each emission. I'm going to add Roman to this thread. Maybe, he has > sees something obvious which I'm missing. > Could you run the job with the log level set to debug and provide the logs > once more? Additionally, having both the TaskManager's and the > JobManager's logs available would help in understanding what's going on. > > Best, > Matthias > > On Wed, Jun 30, 2021 at 6:14 PM tao xiao <xiaotao...@gmail.com> wrote: > > Hi team, > > Does anyone have a clue? > > On Mon, Jun 28, 2021 at 3:27 PM tao xiao <xiaotao...@gmail.com> wrote: > > My job is very simple as you can see from the code I pasted. I simply > print out the number to stdout. If you look at the log the number continued > to print out after checkpoint 1 which indicated no back pressure was > happening. It is very easy to reproduce this if you run the code I > provided in IDE > > > ------------LOG---------------- > > [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator > Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: > Checkpoint was declined. > (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl) > org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 1 for operator Source: Custom Source -> Sink: Print to > Std. Out (1/1)#0. Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > [flink-runtime_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > [flink-runtime_2.11-1.12.1.jar:1.12.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261] > Caused by: org.apache.flink.util.SerializedThrowable: npe > at > com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111) > ~[classes/:?] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > ... 20 more > [2021-06-26 16:08:55,357] INFO Checkpoint 1 of job > afde4a82f41e8284cb0bfff20497a5cc expired before completing. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > output > [2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ > 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > 33 > 34 > 35 > [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job > afde4a82f41e8284cb0bfff20497a5cc expired before completing. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > 36 > 37 > 38 > > > Main function > StreamExecutionEnvironment env = StreamExecutionEnvironment. > getExecutionEnvironment(); > env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true)); > env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(3_000); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000); > > env.addSource(new FromElementsFunctionT()) > .setParallelism(1) > .print() > .setParallelism(1); > env.execute("Demo"); > > source funciton > package sample.flink; > > import java.util.ArrayList; > import java.util.List; > import org.apache.flink.annotation.PublicEvolving; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.typeutils.base.IntSerializer; > import org.apache.flink.runtime.state.FunctionInitializationContext; > import org.apache.flink.runtime.state.FunctionSnapshotContext; > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Preconditions; > > /** > * A stream source function that returns a sequence of elements. > * > * <p>Upon construction, this source function serializes the elements > using Flink's type > * information. That way, any object transport using Java serialization > will not be affected by the > * serializability of the elements. > * > * <p><b>NOTE:</b> This source has a parallelism of 1. > * > */ > @PublicEvolving > public class FromElementsFunctionT implements SourceFunction<Integer>, > CheckpointedFunction > { > > private static final long serialVersionUID = 1L; > > /** The number of elements emitted already. */ > private volatile int numElementsEmitted; > > /** Flag to make the source cancelable. */ > private volatile boolean isRunning = true; > > private transient ListState<Integer> checkpointedState; > > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception > { > Preconditions.checkState( > this.checkpointedState == null, > "The " + getClass().getSimpleName() + " has already been initialized."); > > this.checkpointedState = > context.getOperatorStateStore() > .getListState( > new ListStateDescriptor<>( > "from-elements-state", IntSerializer.INSTANCE)); > > if (context.isRestored()) { > List<Integer> retrievedStates = new ArrayList<>(); > for (Integer entry : this.checkpointedState.get()) { > retrievedStates.add(entry); > } > > // given that the parallelism of the function is 1, we can only have 1 > state > Preconditions.checkArgument( > retrievedStates.size() == 1, > getClass().getSimpleName() + " retrieved invalid state."); > > this.numElementsEmitted = retrievedStates.get(0); > } > } > > @Override > public void run(SourceContext<Integer> ctx) throws Exception { > final Object lock = ctx.getCheckpointLock(); > > while (isRunning && numElementsEmitted < Integer.MAX_VALUE) { > Thread.sleep(1000); > synchronized (lock) { > ctx.collect(numElementsEmitted++); > } > } > } > > @Override > public void cancel() { > isRunning = false; > } > > // ------------------------------------------------------------------------ > // Checkpointing > // ------------------------------------------------------------------------ > > @Override > public void snapshotState(FunctionSnapshotContext context) throws Exception > { > Preconditions.checkState( > this.checkpointedState != null, > "The " + getClass().getSimpleName() + " has not been properly > initialized."); > > this.checkpointedState.clear(); > this.checkpointedState.add(this.numElementsEmitted); > throw new NullPointerException("npe"); > } > } > > > > > On Mon, Jun 28, 2021 at 2:36 PM Yun Tang <myas...@live.com> wrote: > > Hi Tao, > > I'm afraid that your Flink job continues to be in high backpressued and > all subsequent checkpoints did not ever run > 'FromElementsFunctionT#snapshotState' which means your code to throw > exception never be executed. You could check those expired checkpoints to > see whether your tasks containing 'FromElementsFunctionT' has ever been > completed. > > Best > Yun Tang > ------------------------------ > *From:* tao xiao <xiaotao...@gmail.com> > *Sent:* Saturday, June 26, 2021 16:40 > *To:* user <user@flink.apache.org> > *Subject:* Re: Exception in snapshotState suppresses subsequent > checkpoints > > Btw here is the checkpoint related log > > [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator > Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: > Checkpoint was declined. > (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl) > org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 1 for operator Source: Custom Source -> Sink: Print to > Std. Out (1/1)#0. Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > [flink-runtime_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > [flink-runtime_2.11-1.12.1.jar:1.12.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261] > Caused by: org.apache.flink.util.SerializedThrowable: npe > at > com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111) > ~[classes/:?] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > ... 20 more > [2021-06-26 16:08:55,357] INFO Checkpoint 1 of job > afde4a82f41e8284cb0bfff20497a5cc expired before completing. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > output > [2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ > 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > 33 > 34 > 35 > [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job > afde4a82f41e8284cb0bfff20497a5cc expired before completing. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > > On Sat, Jun 26, 2021 at 4:36 PM tao xiao <xiaotao...@gmail.com> wrote: > > Hi team, > > I run a simple 1.12.1 Flink job in IDE with > TolerableCheckpointFailureNumber set where I throw an exception in source > function snapshotState intentionally to verify how Flink behaves. What I > find is the first checkpoint throws the exception and eventually time out > while the main flow continues to work. This is expected however all > subsequent checkpoints don't reach the exception anymore and report timeout > when timeout reaches. I want to know if this is expected behavior which all > later checkpoints cannot finish if there is one checkpoint that throws > exception. > > Below is the code the reproduce the behavior > main > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true)); > env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(3_000); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000); > > env.addSource(new FromElementsFunctionT()) > .setParallelism(1) > .print() > .setParallelism(1); > env.execute("Demo"); > > > Source function > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package sample.flink; > > import java.util.ArrayList; > import java.util.List; > import org.apache.flink.annotation.PublicEvolving; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.typeutils.base.IntSerializer; > import org.apache.flink.runtime.state.FunctionInitializationContext; > import org.apache.flink.runtime.state.FunctionSnapshotContext; > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Preconditions; > > /** > * A stream source function that returns a sequence of elements. > * > * <p>Upon construction, this source function serializes the elements using > Flink's type > * information. That way, any object transport using Java serialization will > not be affected by the > * serializability of the elements. > * > * <p><b>NOTE:</b> This source has a parallelism of 1. > * > */ > @PublicEvolving > public class FromElementsFunctionT implements SourceFunction<Integer>, > CheckpointedFunction { > > private static final long serialVersionUID = 1L; > > /** The number of elements emitted already. */ > private volatile int numElementsEmitted; > > /** Flag to make the source cancelable. */ > private volatile boolean isRunning = true; > > private transient ListState<Integer> checkpointedState; > > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception { > Preconditions.checkState( > this.checkpointedState == null, > "The " + getClass().getSimpleName() + " has already been > initialized."); > > this.checkpointedState = > context.getOperatorStateStore() > .getListState( > new ListStateDescriptor<>( > "from-elements-state", > IntSerializer.INSTANCE)); > > if (context.isRestored()) { > List<Integer> retrievedStates = new ArrayList<>(); > for (Integer entry : this.checkpointedState.get()) { > retrievedStates.add(entry); > } > > // given that the parallelism of the function is 1, we can only > have 1 state > Preconditions.checkArgument( > retrievedStates.size() == 1, > getClass().getSimpleName() + " retrieved invalid state."); > > this.numElementsEmitted = retrievedStates.get(0); > } > } > > @Override > public void run(SourceContext<Integer> ctx) throws Exception { > final Object lock = ctx.getCheckpointLock(); > > while (isRunning && numElementsEmitted < Integer.MAX_VALUE) { > Thread.sleep(1000); > synchronized (lock) { > ctx.collect(numElementsEmitted++); > } > } > } > > @Override > public void cancel() { > isRunning = false; > } > > // > ------------------------------------------------------------------------ > // Checkpointing > // > ------------------------------------------------------------------------ > > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > Preconditions.checkState( > this.checkpointedState != null, > "The " + getClass().getSimpleName() + " has not been properly > initialized."); > > this.checkpointedState.clear(); > this.checkpointedState.add(this.numElementsEmitted); > throw new NullPointerException("npe"); > } > } > > > > -- > Regards, > Tao > > > > -- > Regards, > Tao > > > > -- > Regards, > Tao > > > > -- > Regards, > Tao > > > -- Regards, Tao