Hi, We are running a flink cluster (Flink version 1.14.3) on kubernetes with high-availablity.type: kubernetes. We have 3 jobmanagers. When we send jobs to the flink cluster, we run a "flink list --jobmanager flink-jobmanager:8081" command as part of the process".
At first, we succeeded to run this command from within any of the jobmanager CLIs. But after the elected leader is deleted (For whatever reason. For example, server failure), the configmaps with the following format are updated with the new leader IP address: flink-cluster-ecb24e88d60bb06917da1c933785811a-jobmanager-leader flink-cluster-b4bef19e6481a6c42340e51b69e30923-jobmanager-leader ... But the following configmaps are not always updated with the same IP address as the others: flink-cluster-restserver-leader flink-cluster-resourcemanager-leader flink-cluster-dispatcher-leader Then, when we run the flink list command, we receive the error attached at the end of this mail. If we delete the jobmanager where the flink-cluster-restserver-leader configmap is pointing, the configmap gets modified to the IP address of the other configmaps, and the "flink list" command succeeds. Note: I can see in the log that the command attempts to connect to the IP which is set in the configmap: flink-cluster-restserver-leader' How do we fix this issue without needing any manual intervention? Thanks, Yael Error from CLI when running flink list command: root@flink-jobmanager-68b5fb748d-wwmvt:/opt/flink# flink list --jobmanager localhost:8081 2022-12-12 15:04:19,037 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for xi-env/*flink-cluster-restserver-leader*, watching id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e4 12c Waiting for response... 2022-12-12 15:04:21,231 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-restserver-leader'}. 2022-12-12 15:04:21,233 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for xi-env/flink-cluster-restserver-leader, watching id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e41 2c ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Failed to retrieve job list. at org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449) at org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002) at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to serialize the result for RPC call : requestMultipleJobDetails. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373) at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) at org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:858) at org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:876) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) Caused by: java.io.NotSerializableException: java.util.HashMap$Values at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400) ... 54 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)