Yes, the wrong button was pushed when replying last time. -.-

Looking into the code once again [1], you're right. It looks like for
"last-state", no job is cancelled but the cluster deployment is just
deleted. I was assuming that the artifacts the documentation about the
JobResultStore resource leak [2] is referring to are the
JobResultStoreEntry files rather than other artifacts (e.g. jobgraphs). But
yeah, if we only delete the deployment, no Flink-internal cleanup is done.

I'm wondering what the reasoning behind that is.

[1]
https://github.com/apache/flink-kubernetes-operator/blob/ea01e294cf1b68d597244d0a11b3c81822a163e7/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L336
[2]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak

On Thu, Dec 8, 2022 at 11:04 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Matthias,
>
> I think you didn't include the mailing list in your response.
>
> According to my experiments, using last-state means the operator simply
> deletes the Flink pods, and I believe that doesn't count as Cancelled, so
> the artifacts for blobs and submitted job graphs are not cleaned up. I
> imagine the same logic Gyula mentioned before applies, namely keep the
> latest one and clean the older ones.
>
> Regards,
> Alexis.
>
> Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
> matthias.p...@aiven.io>:
>
>> I see, I confused the Flink-internal recovery with what the Flink
>> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
>> an upgrade of your job, the operator will cancel the Flink job (I'm
>> assuming now that you use Flink's Application mode rather than Session
>> mode). The operator cancelled your job and shuts down the cluster.
>> Checkpoints are retained and, therefore, can be used as the so-called "last
>> state" when redeploying your job using the same Job ID. In that case, the
>> corresponding jobGraph and other BLOBs should be cleaned up by Flink
>> itself. The checkpoint files are retained, i.e. survive the Flink cluster
>> shutdown.
>>
>> When redeploying the Flink cluster with the (updated) job, a new JobGraph
>> file is created by Flink internally. BLOBs are recreated as well. New
>> checkpoints are going to be created and old ones (that are not needed
>> anymore) are cleaned up.
>>
>> Just to recap what I said before (to make it more explicit to
>> differentiate what the k8s operator does and what Flink does internally):
>> Removing the artifacts you were talking about in your previous post would
>> harm Flink's internal recovery mechanism. That's probably not what you want.
>>
>> @Gyula: Please correct me if I misunderstood something here.
>>
>> I hope that helped.
>> Matthias
>>
>> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> I see, thanks for the details.
>>>
>>> I do mean replacing the job without stopping it terminally.
>>> Specifically, I mean updating the container image with one that contains
>>> an updated job jar. Naturally, the new version must not break state
>>> compatibility, but as long as that is fulfilled, the job should be able to
>>> use the last checkpoint as starting point. It's my understanding that this
>>> is how the Kubernetes operator's "last-state" upgrade mode works [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>>> matthias.p...@aiven.io>:
>>>
>>>> > - job_name/submittedJobGraphX
>>>> submittedJobGraph* is the persisted JobGraph that would be picked up in
>>>> case of a failover. Deleting this file would result in Flink's failure
>>>> recovery not working properly anymore if the job is still executed but
>>>> needs to be restarted because the actual job definition is gone.
>>>>
>>>> > completedCheckpointXYZ
>>>> This is the persisted CompletedCheckpoint with a reference to the
>>>> actual Checkpoint directory. Deleting this file would be problematic if the
>>>> state recovery relies in some way on this specific checkpoint. The HA data
>>>> relies on this file to be present. Failover only fails if there's no newer
>>>> checkpoint or the HA data still refers to this checkpoint in some way.
>>>>
>>>> > - job_name/blob/job_uuid/blob_...
>>>> Artifacts of the BlobServer containing runtime artifacts of the jobs
>>>> (e.g. logs, libraries, ...)
>>>>
>>>> In general, you don't want to clean HA artifacts if the job hasn't
>>>> reached a terminal state, yet, as it harms Flink's ability to recover the
>>>> job. Additionally, these files are connected with the HA backend, i.e. the
>>>> file path is stored in the HA backend. Removing the artifacts will likely
>>>> result in metadata becoming invalid.
>>>>
>>>> What do you mean with "testing updates *without* savepoints"? Are you
>>>> referring to replacing the job's business logic without stopping the job?
>>>>
>>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> Then the explanation is likely that the job has not reached a terminal
>>>>> state. I was testing updates *without* savepoints (but with HA), so I 
>>>>> guess
>>>>> that never triggers automatic cleanup.
>>>>>
>>>>> Since, in my case, the job will theoretically never reach a terminal
>>>>> state with this configuration, would it cause issues if I clean the
>>>>> artifacts manually?
>>>>>
>>>>> *And for completeness, I also see an artifact called
>>>>> completedCheckpointXYZ which is updated over time, and I imagine that
>>>>> should never be removed.
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>> Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
>>>>> matthias.p...@aiven.io>:
>>>>>
>>>>>> Flink should already take care of cleaning the artifacts you
>>>>>> mentioned. Flink 1.15+ even includes retries if something went wrong. 
>>>>>> There
>>>>>> are still a few bugs that need to be fixed (e.g. FLINK-27355 [1]).
>>>>>> Checkpoint HA data is not properly cleaned up, yet, which is covered by
>>>>>> FLIP-270 [2].
>>>>>>
>>>>>> It would be interesting to know why these artifacts haven't been
>>>>>> deleted assuming that the corresponding job is actually in a final state
>>>>>> (e.g. FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry
>>>>>> file for that specific job available in the folder Gyula was referring to
>>>>>> in the linked documentation. At least for the JobGraph files, it's likely
>>>>>> that you have additional metadata still stored in your HA backend (that
>>>>>> refers to the files). That would be something you might want to clean up 
>>>>>> as
>>>>>> well, if you want to do a proper cleanup. But still, it would be good to
>>>>>> understand why these files are not cleaned up by Flink.
>>>>>>
>>>>>> Best,
>>>>>> Matthias
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-27355
>>>>>> [2]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>>>>>>
>>>>>> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa <
>>>>>> sarda.espin...@gmail.com> wrote:
>>>>>>
>>>>>>> One concrete question, under the HA folder I also see these sample
>>>>>>> entries:
>>>>>>>
>>>>>>> - job_name/blob/job_uuid/blob_...
>>>>>>> - job_name/submittedJobGraphX
>>>>>>> - job_name/submittedJobGraphY
>>>>>>>
>>>>>>> Is it safe to clean these up when the job is in a healthy state?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Alexis.
>>>>>>>
>>>>>>> Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
>>>>>>> sarda.espin...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Gyula,
>>>>>>>>
>>>>>>>> that certainly helps, but to set up automatic cleanup (in my case,
>>>>>>>> of azure blob storage), the ideal option would be to be able to set a
>>>>>>>> simple policy that deletes blobs that haven't been updated in some 
>>>>>>>> time,
>>>>>>>> but that would assume that anything that's actually relevant for the 
>>>>>>>> latest
>>>>>>>> state is "touched" by the JM on every checkpoint, and since I also see
>>>>>>>> blobs referencing "submitted job graphs", I imagine that might not be a
>>>>>>>> safe assumption.
>>>>>>>>
>>>>>>>> I understand the life cycle of those blobs isn't directly managed
>>>>>>>> by the operator, but in that regard it could make things more 
>>>>>>>> cumbersome.
>>>>>>>>
>>>>>>>> Ideally, Flink itself would guarantee this sort of allowable TTL
>>>>>>>> for HA files, but I'm sure that's not trivial.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Alexis.
>>>>>>>>
>>>>>>>> On Mon, 5 Dec 2022, 19:19 Gyula Fóra, <gyula.f...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi!
>>>>>>>>>
>>>>>>>>> There are some files that are not cleaned up over time in the HA
>>>>>>>>> dir that need to be cleaned up by the user:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hope this helps
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>>>>>>>>> sarda.espin...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I see the number of entries in the directory configured for HA
>>>>>>>>>> increases over time, particularly in the context of job upgrades in a
>>>>>>>>>> Kubernetes environment managed by the operator. Would it be safe to 
>>>>>>>>>> assume
>>>>>>>>>> that any files that haven't been updated in a while can be deleted?
>>>>>>>>>> Assuming the checkpointing interval is much smaller than the period 
>>>>>>>>>> used to
>>>>>>>>>> determine if files are too old.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Alexis.
>>>>>>>>>>
>>>>>>>>>>

Reply via email to