Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1] 之所以这样设计有两点原因: (1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复 (2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露
[1]. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up Best, Yang Zhanghao Chen <zhanghao.c...@outlook.com> 于2022年6月13日周一 07:53写道: > 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink > k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 > > 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 > ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。 > > 2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。 > > 开启 HA 的 Application mode 的 Flink job id > 皆为00000000000000000000000000000000,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA > 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 > JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 > job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp > 相关联,导致作业从全新状态恢复。 > > 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 > > 可以看下官方的博客文章: > https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 > JIRA 设计文档: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink > > > Best, > Zhanghao Chen > ________________________________ > From: m18814122325 <m18814122...@163.com> > Sent: Sunday, June 12, 2022 22:45 > To: user-zh@flink.apache.org <user-zh@flink.apache.org> > Subject: Flink k8s HA 手动删除作业deployment导致的异常 > > Flink version: 1.15.0 > > deploy mode: Native k8s application > > > > > 问题现象: > > 我以Native > k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s > 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 > > > > > kubectl delete deployment flink-bdra-sql-application-job -n > bdra-dev-flink-standalone > > > > > kubectl get configMap -n bdra-dev-flink-standalone > > > > > NAME > DATA AGE > > flink-bdra-sql-application-job-00000000000000000000000000000000-config-map > 2 13m > > flink-bdra-sql-application-job-cluster-config-map > 1 13m > > > > > > > > 我有以下疑问: > > 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink > k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 > > 2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。 > > 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 > > > > > 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) > > flink run-application --target kubernetes-application -c CalculateUv > -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p > -Dkubernetes.container.image= > acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 > -Dkubernetes.namespace=bdra-dev-flink-standalone > -Dkubernetes.service-account=bdra-dev-flink-standalone-sa > -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 > -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 > -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m > -Dstate.backend=filesystem > -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 > -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 > -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > -Dhigh-availability.storageDir=file:///opt/flink/log/recovery > -Ds3.access-key=* -Ds3.secret-key=* > -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory > -Dmetrics.reporter.influxdb.scheme=http > -Dmetrics.reporter.influxdb.host=influxdb > -Dmetrics.reporter.influxdb.port=8086 > -Dmetrics.reporter.influxdb.db=flink_metrics > -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 > -Dkubernetes.rest-service.exposed.type=ClusterIP > -Dkubernetes.config.file=kube_config > -Dkubernetes.pod-template-file=pod-template.yaml > local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar > > > > > 重启后自动从ConfigMap中恢复。 > > 2022-06-10 20:20:52,592 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > [] - Successfully recovered 1 persisted job graphs. > > 2022-06-10 20:20:52,654 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting > RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher > at akka://flink/user/rpc/dispatcher_1 . > > 2022-06-10 20:20:53,552 INFO > org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered > 0 pods from previous attempts, current attempt id is 1. > > 2022-06-10 20:20:53,552 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Recovered 0 workers from previous attempt. > > 2022-06-10 20:20:55,352 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Starting DefaultLeaderElectionService with > org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5. > > 2022-06-10 20:20:55,370 INFO org.apache.flink.client.ClientUtils > [] - Starting program (detached: false) > > 2022-06-10 20:20:55,394 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting > RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/rpc/jobmanager_2 . > > 2022-06-10 20:20:55,438 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - > Initializing job > 'insert-into_default_catalog.default_database.buy_cnt_per_hour' > (00000000000000000000000000000000). > > 2022-06-10 20:20:55,477 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > restart back off time strategy > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, > backoffTimeMS=1000) for > insert-into_default_catalog.default_database.buy_cnt_per_hour > (00000000000000000000000000000000). > > 2022-06-10 20:20:55,558 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Recovering checkpoints from > KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}. > > 2022-06-10 20:20:55,572 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Found 1 checkpoints in > KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}. > > 2022-06-10 20:20:55,572 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Trying to fetch 1 checkpoints from storage. > > 2022-06-10 20:20:55,572 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Trying to retrieve checkpoint 64. > > 2022-06-10 20:20:55,760 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Running > initialization on master for job > insert-into_default_catalog.default_database.buy_cnt_per_hour > (00000000000000000000000000000000). > > 2022-06-10 20:20:55,760 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - > Successfully ran initialization on master in 0 ms. > > 2022-06-10 20:20:56,254 INFO > org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - > Built 1 new pipelined regions in 2 ms, total 1 pipelined regions currently. > > 2022-06-10 20:20:56,266 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > job/cluster config to configure application-defined state backend: > org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5151a2cd > > 2022-06-10 20:20:56,267 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > application-defined state backend: > org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5cfc3f54 > > 2022-06-10 20:20:56,267 INFO > org.apache.flink.runtime.state.StateBackendLoader [] - State > backend loader loads the state backend as HashMapStateBackend > > 2022-06-10 20:20:56,270 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > job/cluster config to configure application-defined checkpoint storage: > org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@f3bc6a6 > > 2022-06-10 20:20:57,763 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring > job 00000000000000000000000000000000 from Checkpoint 64 @ 1654863136682 for > 00000000000000000000000000000000 located at > s3p://otsp-flink-lun01/flink-checkpoints/00000000000000000000000000000000/chk-64. > > 2022-06-10 20:20:57,797 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master > state to restore > > 2022-06-10 20:20:57,798 INFO > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator > [] - Resetting coordinator to checkpoint. > > 2022-06-10 20:20:57,839 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing > SourceCoordinator for source Source: user_behavior[1]. > > 2022-06-10 20:20:57,840 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source > coordinator for source Source: user_behavior[1] closed. > > 2022-06-10 20:20:57,847 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Restoring SplitEnumerator of source Source: user_behavior[1] from > checkpoint. > > 2022-06-10 20:20:57,866 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > failover strategy > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@55d02910 > for insert-into_default_catalog.default_database.buy_cnt_per_hour > (00000000000000000000000000000000). > > 2022-06-10 20:20:57,989 INFO > org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - > Starting DefaultLeaderRetrievalService with > KubernetesLeaderRetrievalDriver{configMapName='flink-bdra-sql-application-job-cluster-config-map'}. > > 2022-06-10 20:20:57,989 INFO > org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer > [] - Starting to watch for > bdra-dev-flink-standalone/flink-bdra-sql-application-job-cluster-config-map, > watching id:93e9b11c-a69c-425a-b35a-e65bc53ea5b1 > > 2022-06-10 20:20:57,990 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Starting > execution of job > 'insert-into_default_catalog.default_database.buy_cnt_per_hour' > (00000000000000000000000000000000) under job master id > 92d47a56896398911c0078e0a2544608. > > 2022-06-10 20:20:57,996 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting > split enumerator for source Source: user_behavior[1]. > > 2022-06-10 20:20:57,998 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Starting > scheduling with scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > > 2022-06-10 20:20:57,999 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > insert-into_default_catalog.default_database.buy_cnt_per_hour > (00000000000000000000000000000000) switched from state CREATED to RUNNING. > > 2022-06-10 20:20:58,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (1/8) > (1b3b93d9f79b647fcc54d5253974d94f) switched from CREATED to SCHEDULED. > > 2022-06-10 20:20:58,050 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (2/8) > (ebf049155617e3fc58d449eb9f3b0eb6) switched from CREATED to SCHEDULED. > > 2022-06-10 20:20:58,050 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (3/8) > (118cb05f1cb95e9a569a1d99e5aef29e) switched from CREATED to SCHEDULED. > > >