Hi, Young 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。
也许你可以创建一个 jira issue 来跟进这个问题 Best, Weihua On Thu, Oct 27, 2022 at 6:51 PM Young Chen <niaomingj...@outlook.com> wrote: > 【问题描述】 > > Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), > 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。 > > 可以看到容器中如下error日志。 > > > > 【操作步骤】 > > 部署Cluster > > > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkDeployment > > metadata: > > name: flink-cluster-1jm-checkpoint > > spec: > > image: flink:1.15 > > flinkVersion: v1_15 > > flinkConfiguration: > > taskmanager.numberOfTaskSlots: "1" > > state.savepoints.dir: > file:///flink-data/savepoints<file://flink-data/savepoints> > > state.checkpoints.dir: > file:///flink-data/checkpoints<file://flink-data/checkpoints> > > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > > high-availability.storageDir: > file:///flink-data/ha<file://flink-data/ha> > > state.checkpoints.num-retained: "10" > > serviceAccount: flink > > ingress: > > template: "{{name}}.{{namespace}}.k8s.rf.io" > > jobManager: > > replicas: 2 > > podTemplate: > > spec: > > nodeSelector: > > kubernetes.io/hostname: k8s17 > > containers: > > - name: flink-main-container > > volumeMounts: > > - mountPath: /flink-data > > name: flink-volume > > volumes: > > - name: flink-volume > > hostPath: > > # directory location on host > > path: /tmp/flink > > # this field is optional > > type: Directory > > > > 部署job: > > > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkSessionJob > > metadata: > > name: flink-job-1jm-checkpoint > > spec: > > deploymentName: flink-cluster-1jm-checkpoint > > job: > > jarURI: > file:///opt/flink/examples/streaming/StateMachineExample.jar<file://opt/flink/examples/streaming/StateMachineExample.jar> > # 自己打的operator镜像包含了examples的jar > > entryClass: > org.apache.flink.streaming.examples.statemachine.StateMachineExample > > parallelism: 1 > > upgradeMode: savepoint > > > > > > 【相关日志】 > > 1. job部署成功可以运行的一次,operator日志: > > 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService > [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: > 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar. > > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: > [org.apache.flink.runtime.rest.handler.RestHandlerException: File > 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist > in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload. > > at > org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80) > > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > > at java.base/java.lang.Thread.run(Unknown Source) > > ] > > at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown > Source > > 一个JobManager > Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。 > > > > > > 1. job部署失败operator日志: > > 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher > [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing > ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', > namespace='flink'}, version: 120505701} failed. > > org.apache.flink.kubernetes.operator.exception.ReconciliationException: > org.apache.flink.util.FlinkRuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., <Exception on server side: > > java.util.concurrent.CompletionException: > org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file > /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar > does not exist > > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172) > > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141) > > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100) > > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:57) > > at > org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83) > > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > > at java.base/java.util.Optional.ifPresent(Unknown Source) > > Leader JobManager Pod中也看到类似日志。 > > > > 【分析】 > > 在HA模式中,JM的replica是否有必要设为过多(配置上replica可配)? > 有多个JM时,其中一个是leader,另一个JM怎么会接受到了上传的jar了? > > 第1种情况看上去像是 > operator提交jar到一个JM上,这个JM也把jar部署起来了,然后第二个JM成为了Leader,operator删除时又连接到了第2个JM上,仅导致删除jar失败。 > > 第2种情况是operator提交jar到一个JM上,但是另一个JM才是leader,导致它部署时找不到jar. > > 如果一个JM是leader,那么通过WebUI访问时一定是访问到这个POD,还是也有可能访问到另一个JM的POD? > > 通过WebUI上传jar,webUI自动刷新时,一会儿看到这个jar,一会儿看不到。好像两个POD是随机访问的。 > > > > > >