回复:Re: Re: submit jobGraph error on server side
感谢大神, 是我配置的资源太少导致响应慢,导致akka 超时。 现在我换了一个k8s 集群,调大了资源,已经不再配到邮件中的发生的异常。 原始邮件 发件人:"Zili Chen"< wander4...@gmail.com >; 发件时间:2019/8/7 15:32 收件人:"王智"< ben.wa...@foxmail.com >; 抄送人:"user-zh"< user-zh@flink.apache.org >; 主题:Re: Re: submit jobGraph error on server side 从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。 但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把 gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。 Best, tison. 王智
Re: Flink官网barrier疑问
你好,范瑞 Barrier alignment 这里并不会涉及 output/input queue,pending 的只是用于 alignment 的一小部分数据。 如果想了解 checkpoint 的原理,建议阅读文档中引用的两篇论文。[1] [2] 如果想了解 Flink 的具体实现,这里的文档是 internal 部分的,可能需要阅读一下相关代码了。[3] [4] 1. https://arxiv.org/abs/1506.08603 2. https://www.microsoft.com/en-us/research/publication/distributed-snapshots-determining-global-states-distributed-system/?from=https%3A%2F%2Fresearch.microsoft.com%2Fen-us%2Fum%2Fpeople%2Flamport%2Fpubs%2Fchandy.pdf 3. https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java 4. https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java Thanks, Biao /'bɪ.aʊ/ On Wed, Aug 7, 2019 at 2:11 PM ❄ <836961...@qq.com> wrote: > Hi,老师: > 老师,你好flink官网这个页面( > https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/stream_checkpointing.htm)介绍barrier对齐的这里第三步 > • Once the last stream has received barrier n, the operator emits all > pending outgoing records, and then emits snapshot n barriers itself. > > 这句话,说的是一旦接受到上游所有流的barrier n,这个Operator实例会发送所有 pending的输出记录,然后发送 > 把自己的barrier n发出去到下游。这里的pending的输出记录是指什么数据?是指barrier之前的那些还在Output > Queue中的数据吗?不是barrier之后的数据吧,因为精准一次语义的话,snapshot之前,barrier之后的数据应该还没开始处理,等同步快照结束后才能开始处理。如果这里指的是barrier之前那些还在Output > Queue中的数据,那么也不能马上把这些数据发出去吧,应该还要考虑下游的Input Queue中有足够空间 > > > > 望解答,谢谢老师! > > 范瑞
Re: flink-1.8.1 yarn per job模式使用
非常感谢tison,完美的解决了我的问题,以后会多留意社区问题。 具体到自己的hadoop版本,就是在flink工程编译 flink-1.8.1/flink-shaded-hadoop/flink-shaded-hadoop2-uber/target 的jar放在lib下即可 Zili Chen 于2019年8月7日周三 下午7:33写道: > 这个问题以前邮件列表有人提过...不过现在 user-zh 没有 archive 不好引用。 > > 你看下是不是 lib 下面没有 flink-shaded-hadoop-2-uber--7.0.jar 这样一个文件。 > > 1.8.1 之后 FLINK 把 hadoop(YARN) 的 lib 分开 release 了,你要指定自己的 HADOOP_CLASSPATH > 或者下载 FLINK 官网 pre-bundle 的 hadoop。 > > 具体可以看这个页面(https://flink.apache.org/downloads.html)第一段的内容。 > > Best, > tison. > > > 李玉环 于2019年8月7日周三 下午7:15写道: > > > Hi 大家好: > > > > 在使用flink过程中,运行官网给的命令 > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn > > 报错如下: > > > > ➜ flink-1.8.1 ./bin/flink run -m yarn-cluster > > ./examples/batch/WordCount.jar > > > > The program finished with the following exception: > > > > java.lang.RuntimeException: Could not identify hostname and port in > > 'yarn-cluster'. > > at > > > > > org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47) > > at > > > > > org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83) > > at > > > > > org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60) > > at > > > > > org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35) > > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224) > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > > at > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > > at > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > > at > > > > > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > > > > > > 疑问: > > 1.为什么会将 yarn-clustet解析为host? > > 2.要运行single flink job on yarn的正确姿势是啥? > > > > Best, > > Yuhuan > > >
Re: flink-1.8.1 yarn per job模式使用
这个问题以前邮件列表有人提过...不过现在 user-zh 没有 archive 不好引用。 你看下是不是 lib 下面没有 flink-shaded-hadoop-2-uber--7.0.jar 这样一个文件。 1.8.1 之后 FLINK 把 hadoop(YARN) 的 lib 分开 release 了,你要指定自己的 HADOOP_CLASSPATH 或者下载 FLINK 官网 pre-bundle 的 hadoop。 具体可以看这个页面(https://flink.apache.org/downloads.html)第一段的内容。 Best, tison. 李玉环 于2019年8月7日周三 下午7:15写道: > Hi 大家好: > > 在使用flink过程中,运行官网给的命令 > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn > 报错如下: > > ➜ flink-1.8.1 ./bin/flink run -m yarn-cluster > ./examples/batch/WordCount.jar > > The program finished with the following exception: > > java.lang.RuntimeException: Could not identify hostname and port in > 'yarn-cluster'. > at > > org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47) > at > > org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83) > at > > org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60) > at > > org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > > > 疑问: > 1.为什么会将 yarn-clustet解析为host? > 2.要运行single flink job on yarn的正确姿势是啥? > > Best, > Yuhuan >
flink-1.8.1 yarn per job模式使用
Hi 大家好: 在使用flink过程中,运行官网给的命令 https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn 报错如下: ➜ flink-1.8.1 ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar The program finished with the following exception: java.lang.RuntimeException: Could not identify hostname and port in 'yarn-cluster'. at org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47) at org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83) at org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60) at org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) 疑问: 1.为什么会将 yarn-clustet解析为host? 2.要运行single flink job on yarn的正确姿势是啥? Best, Yuhuan
Re:关于event-time的定义与产生时间戳位置的问题。
Hi, 可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html 下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。 /** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks{privatefinallongmaxOutOfOrderness=3500;// 3.5 secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current highest timestamp minus the out-of-orderness boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks{privatefinallongmaxTimeLag=5000;// 5 seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current time minus the maximum time lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}} publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks{@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}} 希望能有所帮助。 DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。 kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor(){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}}); 在 2019-08-07 15:47:41,"xiaohei.info" 写道: > >hi,all: > event time这个时间戳是在什么时候打到数据上面去的,看api是在flink > source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka > source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 > > 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? > 不知道有哪里是我理解不对的地方望指教! > 祝好~
关于event-time的定义与产生时间戳位置的问题。
hi,all: event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? 不知道有哪里是我理解不对的地方望指教! 祝好~
Re: Re: submit jobGraph error on server side
从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。 但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把 gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。 Best, tison. 王智 于2019年8月7日周三 下午2:33写道: > 感谢您的回复与指导~ > > > 经过简单的验证(验证方案在邮件末尾),明确是网络问题。 > > > 现在我猜测是flink run 提交job graph 的时候打开了除 这四个以外的端口导致。麻烦再请教一下,flink jobmanager > 是否会打开新的端口进行通讯(或者还有其他端口配置我没有注意到) > > ports: > - containerPort: 6123 > protocol: TCP > - containerPort: 6124 > protocol: TCP > - containerPort: 6125 > protocol: TCP > - containerPort: 8081 > protocol: TCP > > > # flink conf 内的配置 > > jobmanager.rpc.port: 6123 > > jobmanager.rpc.port: 6123 > > jobmanager.rpc.port: 6123 > > blob.server.port: 6124 > > query.server.port: 6125 > > # 8081 没有配置,使用默认,web ui 可以正常访问 > > 我使用k8s 搭建的环境,提交任务的节点(命名为gateway)与jobmanager 在两个不同的pod,gateway 通过jobmanager > 对应的jobmanager-service > 找到jobmanager对应的服务。猜测是因为我在服务中仅定义了上述4个端口,所以gateway节点上的进程无法通过jobmanager-service > 与jobmanager 通讯。 > > > 附: 以下是我的验证方案: 将提交节点与jobmanager 放入同一个pod,使用回环地址(不会有端口限制)通讯,可以成功提交job【flink > 的配置和代码完全一致】 > > > > > > > > > 原始邮件 > > 发件人:"Zili Chen"< wander4...@gmail.com >; > > 发件时间:2019/8/6 19:19 > > 收件人:"user-zh"< user-zh@flink.apache.org >; > > 主题:Re: submit jobGraph error on server side > > > 问题是 Ask timed out on [Actor[akka://flink/user/dispatcher#-273192824]] after > [1 ms]. Sender[null] sent message of type > "org.apache.flink.runtime.rpc. > messages.LocalFencedMessage". > > 也就是 submit job 的时候在请求 Dispatcher 的时候 akka ask timeout > 了,可以检查一下配置的地址和端口是否正确,或者贴出你的相关配置。 > > Best, > tison. > > > 王智于2019年8月6日周二 下午7:13写道: > > > 向session cluster 提交job 出错,麻烦各位老师帮忙看下,给点排查提示 THX~ > > > > > > > > > > 环境: > > > > blink 1.8.0 > > > > 用docker 方式启动的flink session cluster,flink 集群独立,我从集群外的一个docker > > 节点提交job(该节点的flink-conf.yaml 配置与flink 集群内的配置一致) > > > > > > > > > > -- > > > > > > 报错信息: > > > > > > > > The program finished with the following exception: > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method caused an error: > > org.apache.flink.client.program.ProgramInvocationException: Could not > > retrieve the execution result. (JobID: 82 > > > > 3a336683f6476b3e7ee2780c33395b) > > > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) > > > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > > > > at > > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > > > > at > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > > > > at > > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > > > > at > > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > > > > at > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > > > > at > > > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > > > > at > > > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > > > > at > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > > > > Caused by: java.lang.RuntimeException: > > org.apache.flink.client.program.ProgramInvocationException: Could not > > retrieve the execution result. (JobID: 823a336683f6476b3e7ee2780c33395b) > > > > at > > > com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:176) > > > > at > > > com.xx.data.platform.pandora.flink.EntryPoint.main(EntryPoint.java:78) > > > > at > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > at > > java.lang.reflect.Method.invoke(Method.java:498) > > > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > > > > ... 9 more > > > > Caused by: org.apache.flink.client.program.ProgramInvocationException: > > Could not retrieve the execution result. (JobID: > > 823a336683f6476b3e7ee2780c33395b) > > > > at > > > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261) > > > > at > > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > > > > at > > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > > > > at > > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > > > > at > > > com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:174) > > > > ... 15 more > > > > > > > > Caused by: org.apache.flink.runtime