Yes, the wrong button was pushed when replying last time. -.- Looking into the code once again [1], you're right. It looks like for "last-state", no job is cancelled but the cluster deployment is just deleted. I was assuming that the artifacts the documentation about the JobResultStore resource leak [2] is referring to are the JobResultStoreEntry files rather than other artifacts (e.g. jobgraphs). But yeah, if we only delete the deployment, no Flink-internal cleanup is done.
I'm wondering what the reasoning behind that is. [1] https://github.com/apache/flink-kubernetes-operator/blob/ea01e294cf1b68d597244d0a11b3c81822a163e7/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L336 [2] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak On Thu, Dec 8, 2022 at 11:04 AM Alexis Sarda-Espinosa < sarda.espin...@gmail.com> wrote: > Hi Matthias, > > I think you didn't include the mailing list in your response. > > According to my experiments, using last-state means the operator simply > deletes the Flink pods, and I believe that doesn't count as Cancelled, so > the artifacts for blobs and submitted job graphs are not cleaned up. I > imagine the same logic Gyula mentioned before applies, namely keep the > latest one and clean the older ones. > > Regards, > Alexis. > > Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl < > matthias.p...@aiven.io>: > >> I see, I confused the Flink-internal recovery with what the Flink >> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do >> an upgrade of your job, the operator will cancel the Flink job (I'm >> assuming now that you use Flink's Application mode rather than Session >> mode). The operator cancelled your job and shuts down the cluster. >> Checkpoints are retained and, therefore, can be used as the so-called "last >> state" when redeploying your job using the same Job ID. In that case, the >> corresponding jobGraph and other BLOBs should be cleaned up by Flink >> itself. The checkpoint files are retained, i.e. survive the Flink cluster >> shutdown. >> >> When redeploying the Flink cluster with the (updated) job, a new JobGraph >> file is created by Flink internally. BLOBs are recreated as well. New >> checkpoints are going to be created and old ones (that are not needed >> anymore) are cleaned up. >> >> Just to recap what I said before (to make it more explicit to >> differentiate what the k8s operator does and what Flink does internally): >> Removing the artifacts you were talking about in your previous post would >> harm Flink's internal recovery mechanism. That's probably not what you want. >> >> @Gyula: Please correct me if I misunderstood something here. >> >> I hope that helped. >> Matthias >> >> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa < >> sarda.espin...@gmail.com> wrote: >> >>> I see, thanks for the details. >>> >>> I do mean replacing the job without stopping it terminally. >>> Specifically, I mean updating the container image with one that contains >>> an updated job jar. Naturally, the new version must not break state >>> compatibility, but as long as that is fulfilled, the job should be able to >>> use the last checkpoint as starting point. It's my understanding that this >>> is how the Kubernetes operator's "last-state" upgrade mode works [1]. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades >>> >>> Regards, >>> Alexis. >>> >>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl < >>> matthias.p...@aiven.io>: >>> >>>> > - job_name/submittedJobGraphX >>>> submittedJobGraph* is the persisted JobGraph that would be picked up in >>>> case of a failover. Deleting this file would result in Flink's failure >>>> recovery not working properly anymore if the job is still executed but >>>> needs to be restarted because the actual job definition is gone. >>>> >>>> > completedCheckpointXYZ >>>> This is the persisted CompletedCheckpoint with a reference to the >>>> actual Checkpoint directory. Deleting this file would be problematic if the >>>> state recovery relies in some way on this specific checkpoint. The HA data >>>> relies on this file to be present. Failover only fails if there's no newer >>>> checkpoint or the HA data still refers to this checkpoint in some way. >>>> >>>> > - job_name/blob/job_uuid/blob_... >>>> Artifacts of the BlobServer containing runtime artifacts of the jobs >>>> (e.g. logs, libraries, ...) >>>> >>>> In general, you don't want to clean HA artifacts if the job hasn't >>>> reached a terminal state, yet, as it harms Flink's ability to recover the >>>> job. Additionally, these files are connected with the HA backend, i.e. the >>>> file path is stored in the HA backend. Removing the artifacts will likely >>>> result in metadata becoming invalid. >>>> >>>> What do you mean with "testing updates *without* savepoints"? Are you >>>> referring to replacing the job's business logic without stopping the job? >>>> >>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa < >>>> sarda.espin...@gmail.com> wrote: >>>> >>>>> Hi Matthias, >>>>> >>>>> Then the explanation is likely that the job has not reached a terminal >>>>> state. I was testing updates *without* savepoints (but with HA), so I >>>>> guess >>>>> that never triggers automatic cleanup. >>>>> >>>>> Since, in my case, the job will theoretically never reach a terminal >>>>> state with this configuration, would it cause issues if I clean the >>>>> artifacts manually? >>>>> >>>>> *And for completeness, I also see an artifact called >>>>> completedCheckpointXYZ which is updated over time, and I imagine that >>>>> should never be removed. >>>>> >>>>> Regards, >>>>> Alexis. >>>>> >>>>> Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl < >>>>> matthias.p...@aiven.io>: >>>>> >>>>>> Flink should already take care of cleaning the artifacts you >>>>>> mentioned. Flink 1.15+ even includes retries if something went wrong. >>>>>> There >>>>>> are still a few bugs that need to be fixed (e.g. FLINK-27355 [1]). >>>>>> Checkpoint HA data is not properly cleaned up, yet, which is covered by >>>>>> FLIP-270 [2]. >>>>>> >>>>>> It would be interesting to know why these artifacts haven't been >>>>>> deleted assuming that the corresponding job is actually in a final state >>>>>> (e.g. FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry >>>>>> file for that specific job available in the folder Gyula was referring to >>>>>> in the linked documentation. At least for the JobGraph files, it's likely >>>>>> that you have additional metadata still stored in your HA backend (that >>>>>> refers to the files). That would be something you might want to clean up >>>>>> as >>>>>> well, if you want to do a proper cleanup. But still, it would be good to >>>>>> understand why these files are not cleaned up by Flink. >>>>>> >>>>>> Best, >>>>>> Matthias >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-27355 >>>>>> [2] >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints >>>>>> >>>>>> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa < >>>>>> sarda.espin...@gmail.com> wrote: >>>>>> >>>>>>> One concrete question, under the HA folder I also see these sample >>>>>>> entries: >>>>>>> >>>>>>> - job_name/blob/job_uuid/blob_... >>>>>>> - job_name/submittedJobGraphX >>>>>>> - job_name/submittedJobGraphY >>>>>>> >>>>>>> Is it safe to clean these up when the job is in a healthy state? >>>>>>> >>>>>>> Regards, >>>>>>> Alexis. >>>>>>> >>>>>>> Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa < >>>>>>> sarda.espin...@gmail.com>: >>>>>>> >>>>>>>> Hi Gyula, >>>>>>>> >>>>>>>> that certainly helps, but to set up automatic cleanup (in my case, >>>>>>>> of azure blob storage), the ideal option would be to be able to set a >>>>>>>> simple policy that deletes blobs that haven't been updated in some >>>>>>>> time, >>>>>>>> but that would assume that anything that's actually relevant for the >>>>>>>> latest >>>>>>>> state is "touched" by the JM on every checkpoint, and since I also see >>>>>>>> blobs referencing "submitted job graphs", I imagine that might not be a >>>>>>>> safe assumption. >>>>>>>> >>>>>>>> I understand the life cycle of those blobs isn't directly managed >>>>>>>> by the operator, but in that regard it could make things more >>>>>>>> cumbersome. >>>>>>>> >>>>>>>> Ideally, Flink itself would guarantee this sort of allowable TTL >>>>>>>> for HA files, but I'm sure that's not trivial. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Alexis. >>>>>>>> >>>>>>>> On Mon, 5 Dec 2022, 19:19 Gyula Fóra, <gyula.f...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi! >>>>>>>>> >>>>>>>>> There are some files that are not cleaned up over time in the HA >>>>>>>>> dir that need to be cleaned up by the user: >>>>>>>>> >>>>>>>>> >>>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak >>>>>>>>> >>>>>>>>> >>>>>>>>> Hope this helps >>>>>>>>> Gyula >>>>>>>>> >>>>>>>>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa < >>>>>>>>> sarda.espin...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> I see the number of entries in the directory configured for HA >>>>>>>>>> increases over time, particularly in the context of job upgrades in a >>>>>>>>>> Kubernetes environment managed by the operator. Would it be safe to >>>>>>>>>> assume >>>>>>>>>> that any files that haven't been updated in a while can be deleted? >>>>>>>>>> Assuming the checkpointing interval is much smaller than the period >>>>>>>>>> used to >>>>>>>>>> determine if files are too old. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Alexis. >>>>>>>>>> >>>>>>>>>>