感谢大神,

是我配置的资源太少导致响应慢,导致akka 超时。




现在我换了一个k8s 集群,调大了资源,已经不再配到邮件中的发生的异常。









原始邮件



发件人:"Zili Chen"< wander4...@gmail.com &gt;;

发件时间:2019/8/7 15:32

收件人:"王智"< ben.wa...@foxmail.com &gt;;

抄送人:"user-zh"< user-zh@flink.apache.org &gt;;

主题:Re: Re: submit jobGraph error on server side




从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。
但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把
gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。


Best,
tison.










王智 <ben.wa...@foxmail.com&gt; 于2019年8月7日周三 下午2:33写道:


感谢您的回复与指导~




经过简单的验证(验证方案在邮件末尾),明确是网络问题。




现在我猜测是flink run 提交job graph 的时候打开了除 这四个以外的端口导致。麻烦再请教一下,flink jobmanager 
是否会打开新的端口进行通讯(或者还有其他端口配置我没有注意到)
ports:
  - containerPort: 6123
    protocol: TCP
  - containerPort: 6124
    protocol: TCP
  - containerPort: 6125
    protocol: TCP
  - containerPort: 8081
    protocol: TCP
# flink conf 内的配置jobmanager.rpc.port: 6123jobmanager.rpc.port: 
6123jobmanager.rpc.port: 6123
blob.server.port: 6124

query.server.port: 6125
# 8081 没有配置,使用默认,web ui 可以正常访问
我使用k8s 搭建的环境,提交任务的节点(命名为gateway)与jobmanager 在两个不同的pod,gateway 通过jobmanager 
对应的jobmanager-service 
找到jobmanager对应的服务。猜测是因为我在服务中仅定义了上述4个端口,所以gateway节点上的进程无法通过jobmanager-service 
与jobmanager 通讯。



附: 以下是我的验证方案: 将提交节点与jobmanager 放入同一个pod,使用回环地址(不会有端口限制)通讯,可以成功提交job【flink 
的配置和代码完全一致】












原始邮件


发件人:"Zili Chen"< wander4...@gmail.com &gt;;

发件时间:2019/8/6 19:19

收件人:"user-zh"< user-zh@flink.apache.org &gt;;

主题:Re: submit jobGraph error on server side



问题是 Ask timed out on [Actor[akka://flink/user/dispatcher#-273192824]] after
[10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.
messages.LocalFencedMessage".

也就是 submit job 的时候在请求 Dispatcher 的时候 akka ask timeout
了,可以检查一下配置的地址和端口是否正确,或者贴出你的相关配置。

Best,
tison.


王智于2019年8月6日周二 下午7:13写道:

&gt; 向session cluster 提交job 出错,麻烦各位老师帮忙看下,给点排查提示 THX~
&gt;
&gt;
&gt;
&gt;
&gt; 环境:
&gt;
&gt; blink 1.8.0
&gt;
&gt; 用docker 方式启动的flink session cluster,flink 集群独立,我从集群外的一个docker
&gt; 节点提交job(该节点的flink-conf.yaml 配置与flink 集群内的配置一致)
&gt;
&gt;
&gt;
&gt;
&gt; ----------------------
&gt;
&gt;
&gt; 报错信息:
&gt;
&gt; ------------------------------------------------------------
&gt;
&gt; &nbsp;The program finished with the following exception:
&gt;
&gt;
&gt;
&gt;
&gt; org.apache.flink.client.program.ProgramInvocationException: The main
&gt; method caused an error:
&gt; org.apache.flink.client.program.ProgramInvocationException: Could not
&gt; retrieve the execution result. (JobID: 82
&gt;
&gt; 3a336683f6476b3e7ee2780c33395b)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
&gt;
&gt; Caused by: java.lang.RuntimeException:
&gt; org.apache.flink.client.program.ProgramInvocationException: Could not
&gt; retrieve the execution result. (JobID: 823a336683f6476b3e7ee2780c33395b)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
com.xxxxxx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:176)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; com.xxxxxx.data.platform.pandora.flink.EntryPoint.main(EntryPoint.java:78)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; java.lang.reflect.Method.invoke(Method.java:498)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; ... 9 more
&gt;
&gt; Caused by: org.apache.flink.client.program.ProgramInvocationException:
&gt; Could not retrieve the execution result. (JobID:
&gt; 823a336683f6476b3e7ee2780c33395b)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
com.xxxxxx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:174)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; ... 15 more
&gt;
&gt;
&gt;
&gt; Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
&gt; to submit JobGraph.
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
&gt;
&gt; Caused by: org.apache.flink.runtime.rest.util.RestClientException:
&gt; [Internal server error.,&gt;
&gt; akka.pattern.AskTimeoutException: Ask timed out on
&gt; [Actor[akka://flink/user/dispatcher#-273192824]] after [10000 ms].
&gt; Sender[null] sent message of type
&gt; "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
&gt;
&gt;
&gt;
&gt;
&gt; End of exception on server side&gt;]
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; at
&gt; 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
&gt;
&gt; &nbsp; &nbsp; &nbsp; &nbsp; ... 4 more

回复