Hi Brian,

AFAIK, Arvid and Piotr (both in CC) have been working on the threading
model of the checkpoint coordinator.
Maybe they can help with this question.

Best, Fabian

Am Mo., 20. Juli 2020 um 03:36 Uhr schrieb <b.z...@dell.com>:

> Anyone can help us on this issue?
>
>
>
> Best Regards,
>
> Brian
>
>
>
> *From:* Zhou, Brian
> *Sent:* Wednesday, July 15, 2020 18:26
> *To:* 'user@flink.apache.org'
> *Subject:* Pravega connector cannot recover from the checkpoint due to
> "Failure to finalize checkpoint"
>
>
>
> Hi community,
>
> To give some background, https://github.com/pravega/flink-connectors is a
> Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the
> Flink `MasterTriggerRestoreHook` interface to trigger the Pravega
> checkpoint during Flink checkpoints to make sure the data recovery. We
> experienced the failures in the latest Flink 1.11 upgrade with the
> checkpoint recovery, there are some timeout issues for the continuous
> checkpoint failure on some of the test cases.
> Error stacktrace:
>
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint
> acknowledgement message
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 3. Failure reason: Failure to finalize
> checkpoint.
>
>          at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>
>          at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>
>          at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>
>          at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>          at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>          at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint
> has not been fully acknowledged yet
>
>          at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>
>          at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>
>          at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>
>          ... 9 common frames omitted
>
> After some investigation, the main problem is found. It is about the
> checkpoint recovery. When Flink CheckpointCoordinator wants to finalize a
> checkpoint, it needs to check everything is acknowledged, but for some
> reason, the master state still has our ReaderCheckpointHook remaining
> unack-ed, hence leading the checkpoint failure in the complete stage.
> In the PendingCheckpoint::snapshotMasterState, there is an async call to
> acknowledge the master state for each hook. But it returned before the
> acknowledgement.
> I think it might be related to the latest changes of the thread model of
> the checkpoint coordinator. Can someone help to verify?
>
>
>
> *Reproduce procedure:*
> Checkout this branch
> https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and
> run below test case:
> FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint
>
>
>
> [1]
> https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java
>
>
>
> Best Regards,
>
> Brian
>
>
>

Reply via email to