Hi,

We are running a flink cluster (Flink version 1.14.3) on kubernetes with
high-availablity.type: kubernetes. We have 3 jobmanagers. When we send jobs
to the flink cluster, we run a "flink list --jobmanager
flink-jobmanager:8081" command as part of the process".

At first, we succeeded to run this command from within any of the
jobmanager CLIs.
But after the elected leader is deleted (For whatever reason. For example,
server failure), the configmaps with the following format are updated with
the new leader IP address:
flink-cluster-ecb24e88d60bb06917da1c933785811a-jobmanager-leader
flink-cluster-b4bef19e6481a6c42340e51b69e30923-jobmanager-leader
...

But the following configmaps are not always updated with the same IP
address as the others:
flink-cluster-restserver-leader
flink-cluster-resourcemanager-leader
flink-cluster-dispatcher-leader

Then, when we run the flink list command, we receive the error attached at
the end of this mail.
If we delete the jobmanager where the flink-cluster-restserver-leader
configmap is pointing, the configmap gets modified to the IP address of the
other configmaps, and the "flink list" command succeeds.
Note: I can see in the log that the command attempts to connect to the IP
which is set in the configmap: flink-cluster-restserver-leader'

How do we fix this issue without needing any manual intervention?

Thanks,
Yael

Error from CLI when running flink list command:

root@flink-jobmanager-68b5fb748d-wwmvt:/opt/flink# flink list --jobmanager
localhost:8081
2022-12-12 15:04:19,037 INFO
 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
[] - Starting to watch for xi-env/*flink-cluster-restserver-leader*,
watching id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e4
12c
Waiting for response...
2022-12-12 15:04:21,231 INFO
 org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
[] - Stopping
KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-restserver-leader'}.
2022-12-12 15:04:21,233 INFO
 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
[] - Stopped to watch for xi-env/flink-cluster-restserver-leader, watching
id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e41
2c

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Failed to retrieve job list.
        at
org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449)
        at
org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430)
        at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
        at
org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
serialize the result for RPC call : requestMultipleJobDetails.
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)
        at
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
        at
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at
org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:858)
        at
org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:876)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
        at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
        at akka.dispatch.OnComplete.internal(Future.scala:300)
        at akka.dispatch.OnComplete.internal(Future.scala:297)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown
Source)
        at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown
Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown
Source)
Caused by: java.io.NotSerializableException: java.util.HashMap$Values
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown
Source)
        at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown
Source)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown
Source)
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
        at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)
        ... 54 more

End of exception on server side>]
        at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532)
        at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512)
        at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
Source)
        at
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Reply via email to