Hi,All:

   I found some problems about on kubernates flink of 1.6.0 mentioned by Till 
in "HA for 1.6.0 job cluster with docker-compose" in the email list, but I 
found that Jira of flink-10291 in the email has been shut down in 1.7.0, and I 
also found similar errors in on kubernates flink of 1.7.2 at present. Could you 
please help me check the Settings where I have problems? Here are my Settings:

web.log.path: /var/log/flink/flinkweb.log 
taskmanager.log.pth: /var/log/flink/taskmanager/task.log 


jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m


taskmanager.heap.size:  6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2


high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239


rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5


security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs


blob.server.port: 16224
query.server.port: 16225




   And the following is the new error report, the earliest error report in the 
forwarded email message:


apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not 
set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR 
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve 
the redirect address.
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.actor.ActorRef.tell(ActorRef.scala:130)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token not set: Ignoring message 
LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.








-------- Forwarding messages --------
From: "Alex.Hu" <huyc...@163.com>
Date: 2019-07-01 10:17:15
To: "Yang Wang" <danrtsey...@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re:Re: About "Flink 1.7.0 HA based on zookeepers "

Hi,Wang:
    Thank you very much for answering my question!
    I hope to start multiple jobmanagers on kubernete, because according to my 
understanding in the document of jobmanager_high_availability, the standalone 
mode should be that the jobmanager can be replaced seamlessly when the fault 
node goes down through two jobmanagers nodes. 
    In my follow-up test last Friday, I did use the ability to enable only one 
jobmanager in kubernete with the jobmanager_high_availability mode turned on, 
and to set multiple nodes as jobmanager tags to allow kubernate to enable the 
failed node to automatically transition node startup. But there will still be 
some switching time. So I am not sure whether flink in kubernate can achieve 
the above setting of jobmanager seamless hot-swap?




At 2019-06-28 14:11:27, "Yang Wang" <danrtsey...@gmail.com> wrote:

Hi, hu


I am not sure why do you need to start multiple jobmanagers on kubernetes. Just 
as the manual [1], we use a deployment of 1 to make sure kubernetes detect the 
crash of jobmanager and start a new one. What we should do is to add the high 
availability configurations [2] in flink-conf.yaml. You could use the configMap 
[3] to save your flink-conf.yaml and then mount into to jobmanager pod. Also 
you could update the flink-conf.yaml in your flink image.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html
[3] 
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/


胡逸才 <huyc...@163.com> 于2019年6月28日周五 上午11:09写道:

HI Tan:
I have the same problem with you when running "flink-1.7.2 ON KUBERNATE HA" 
mode, may I ask if you have solved this problem? How? After I started the two 
jobmanagers normally, when I tried to kill one of them, he could not restart 
normally. Both jobmanagers reported this error. The specific log is as follows:








2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  
akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote connection 
to [null] failed with java.net.ConnectException: Connection refused: 
tdh2/192.168.208.55:56529
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-14 - Association with remote system 
[akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. 
Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: 
[Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-14 - Association with remote system 
[akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. 
Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: 
[Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.260 [flink-rest-server-netty-worker-thread-7] ERROR 
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve 
the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] 
after [10000 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 common frames omitted




 





 

Reply via email to