Hi, Young

你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。

也许你可以创建一个 jira issue 来跟进这个问题

Best,
Weihua


On Thu, Oct 27, 2022 at 6:51 PM Young Chen <niaomingj...@outlook.com> wrote:

> 【问题描述】
>
> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>
> 可以看到容器中如下error日志。
>
>
>
> 【操作步骤】
>
> 部署Cluster
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:
>
>   name: flink-cluster-1jm-checkpoint
>
> spec:
>
>   image: flink:1.15
>
>   flinkVersion: v1_15
>
>   flinkConfiguration:
>
>     taskmanager.numberOfTaskSlots: "1"
>
>     state.savepoints.dir:
> file:///flink-data/savepoints<file://flink-data/savepoints>
>
>     state.checkpoints.dir:
> file:///flink-data/checkpoints<file://flink-data/checkpoints>
>
>     high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
>     high-availability.storageDir:
> file:///flink-data/ha<file://flink-data/ha>
>
>     state.checkpoints.num-retained: "10"
>
>   serviceAccount: flink
>
>   ingress:
>
>     template: "{{name}}.{{namespace}}.k8s.rf.io"
>
>   jobManager:
>
>     replicas: 2
>
>   podTemplate:
>
>     spec:
>
>       nodeSelector:
>
>         kubernetes.io/hostname: k8s17
>
>       containers:
>
>         - name: flink-main-container
>
>           volumeMounts:
>
>             - mountPath: /flink-data
>
>               name: flink-volume
>
>       volumes:
>
>         - name: flink-volume
>
>           hostPath:
>
>             # directory location on host
>
>             path: /tmp/flink
>
>             # this field is optional
>
>             type: Directory
>
>
>
> 部署job:
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkSessionJob
>
> metadata:
>
>   name: flink-job-1jm-checkpoint
>
> spec:
>
>   deploymentName: flink-cluster-1jm-checkpoint
>
>   job:
>
>     jarURI:
> file:///opt/flink/examples/streaming/StateMachineExample.jar<file://opt/flink/examples/streaming/StateMachineExample.jar>
> # 自己打的operator镜像包含了examples的jar
>
>     entryClass:
> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>
>     parallelism: 1
>
>     upgradeMode: savepoint
>
>
>
>
>
> 【相关日志】
>
>   1.  job部署成功可以运行的一次,operator日志:
>
> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
>
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>
> ]
>
> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
> Source
>
> 一个JobManager
> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。
>
>
>
>
>
>   1.  job部署失败operator日志:
>
> 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing
> ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint',
> namespace='flink'}, version: 120505701} failed.
>
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., <Exception on server side:
>
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file
> /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar
> does not exist
>
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:57)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>
> at java.base/java.util.Optional.ifPresent(Unknown Source)
>
> Leader JobManager Pod中也看到类似日志。
>
>
>
> 【分析】
>
> 在HA模式中,JM的replica是否有必要设为过多(配置上replica可配)?
> 有多个JM时,其中一个是leader,另一个JM怎么会接受到了上传的jar了?
>
> 第1种情况看上去像是
> operator提交jar到一个JM上,这个JM也把jar部署起来了,然后第二个JM成为了Leader,operator删除时又连接到了第2个JM上,仅导致删除jar失败。
>
> 第2种情况是operator提交jar到一个JM上,但是另一个JM才是leader,导致它部署时找不到jar.
>
> 如果一个JM是leader,那么通过WebUI访问时一定是访问到这个POD,还是也有可能访问到另一个JM的POD?
>
> 通过WebUI上传jar,webUI自动刷新时,一会儿看到这个jar,一会儿看不到。好像两个POD是随机访问的。
>
>
>
>
>
>

回复