Hi Weihua,

I am deploying my flink job in HA application mode on a kubernetes cluster.
I am using an external nfs mount for storing checkpoints. For some reason,
whenever I deploy an updated version of my application, it uses the same
job_id for the new job as for the previous job. Thus the flink job creates
checkpoints in the same directory, and at whatever point it encounters the
same checkpoint path(already existing checkpoint from previous versions of
my job) it throws the above error, and the job restarts. I have set my job
restart count as 3. So if this happens continuously for 3 times, the
jobmanager pod restarts, and then it starts the job again from checkpoint-0
or from the last saved savepoint. Then the same story repeats.

Thanks
Regards
Amenreet Singh Sodhi

On Wed, May 10, 2023 at 9:10 AM Weihua Hu <huweihua....@gmail.com> wrote:

> Hi,
>
> if for some reason there exists a checkpoint by same name.
>>
> Could you give more details about your scenarios here?
> From your description, I guess this problem occurred when a job restart,
> does this restart is triggered personally?
>
> In common restart processing, the job will retrieve the latest checkpoint
> from a high-available service(zookeeper or kubernetes),
> and then restore from it and make a new checkpoint with a new
> checkpoint-id.
> In this case, the job does not recover from the old checkpoint, but the
> old checkpoint path already exists.
>
> Best,
> Weihua
>
>
> On Wed, May 10, 2023 at 11:07 AM Hang Ruan <ruanhang1...@gmail.com> wrote:
>
>> Hi, amenreet,
>>
>> As Hangxiang said, we should use a new checkpoint dir if the new job has
>> the same jobId as the old one . Or else you should not use a fixed jobId
>> and the checkpoint dir will not conflict.
>>
>> Best,
>> Hang
>>
>> Hangxiang Yu <master...@gmail.com> 于2023年5月10日周三 10:35写道:
>>
>>> Hi,
>>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>>> as before ?
>>> And you may also start the job without before state ?
>>> The new job cannot know anything about before checkpoints, that's why
>>> the new job will fail when it tries to generate a new checkpoint.
>>> I'd like to suggest you to use different JOB_ID for different jobs, or
>>> set a different checkpoint dir for a new job.
>>>
>>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi <amenso...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Is there any way to prevent restart of flink job, or override the
>>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>>> name. I get the following exception and my job restarts, have been trying
>>>> to find solution for a very long time but havent found anything useful yet,
>>>> other than manually cleaning.
>>>>
>>>> 2023-02-27 10:00:50,360 WARN  
>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>>> [] - Failed to trigger or complete checkpoint 1 for job
>>>> 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
>>>>
>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>>> finalize checkpoint.
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> [?:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> [?:?]
>>>>
>>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>>
>>>> Caused by: java.io.IOException: Target file
>>>> file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata
>>>> already exists.
>>>>
>>>> at
>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:64)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> ... 7 more
>>>>
>>>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>>>>                 [] - Error while processing AcknowledgeCheckpoint
>>>> message
>>>>
>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>>> finalize the pending checkpoint 1. Failure reason: Failure to finalize
>>>> checkpoint.
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> [?:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> [?:?]
>>>>
>>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>>
>>>> Caused by: java.io.IOException: Target file
>>>> file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata
>>>> already exists.
>>>>
>>>> at
>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>>
>>>> Please let me know if anyone knows how to resolve this issue.
>>>>
>>>> Thanks and Regards
>>>>
>>>> Amenreet Singh Sodhi
>>>>
>>>>
>>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>

Reply via email to