回复:Re: Re: submit jobGraph error on server side

2019-08-07 文章 王智
感谢大神,

是我配置的资源太少导致响应慢,导致akka 超时。




现在我换了一个k8s 集群,调大了资源,已经不再配到邮件中的发生的异常。









原始邮件



发件人:"Zili Chen"< wander4...@gmail.com >;

发件时间:2019/8/7 15:32

收件人:"王智"< ben.wa...@foxmail.com >;

抄送人:"user-zh"< user-zh@flink.apache.org >;

主题:Re: Re: submit jobGraph error on server side




从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。
但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把
gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。


Best,
tison.










王智 

Re: Flink官网barrier疑问

2019-08-07 文章 Biao Liu
你好,范瑞

Barrier alignment 这里并不会涉及 output/input queue,pending 的只是用于 alignment
的一小部分数据。

如果想了解 checkpoint 的原理,建议阅读文档中引用的两篇论文。[1] [2]

如果想了解 Flink 的具体实现,这里的文档是 internal 部分的,可能需要阅读一下相关代码了。[3] [4]

1. https://arxiv.org/abs/1506.08603
2.
https://www.microsoft.com/en-us/research/publication/distributed-snapshots-determining-global-states-distributed-system/?from=https%3A%2F%2Fresearch.microsoft.com%2Fen-us%2Fum%2Fpeople%2Flamport%2Fpubs%2Fchandy.pdf
3.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
4.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java

Thanks,
Biao /'bɪ.aʊ/



On Wed, Aug 7, 2019 at 2:11 PM ❄ <836961...@qq.com> wrote:

> Hi,老师:
> 老师,你好flink官网这个页面(
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/stream_checkpointing.htm)介绍barrier对齐的这里第三步
>  •  Once the last stream has received barrier n, the operator emits all
> pending outgoing records, and then emits snapshot n barriers itself.
>
> 这句话,说的是一旦接受到上游所有流的barrier n,这个Operator实例会发送所有 pending的输出记录,然后发送
> 把自己的barrier n发出去到下游。这里的pending的输出记录是指什么数据?是指barrier之前的那些还在Output
> Queue中的数据吗?不是barrier之后的数据吧,因为精准一次语义的话,snapshot之前,barrier之后的数据应该还没开始处理,等同步快照结束后才能开始处理。如果这里指的是barrier之前那些还在Output
> Queue中的数据,那么也不能马上把这些数据发出去吧,应该还要考虑下游的Input Queue中有足够空间
>
>
>
> 望解答,谢谢老师!
>
> 范瑞


Re: flink-1.8.1 yarn per job模式使用

2019-08-07 文章 Yuhuan Li
非常感谢tison,完美的解决了我的问题,以后会多留意社区问题。

具体到自己的hadoop版本,就是在flink工程编译
flink-1.8.1/flink-shaded-hadoop/flink-shaded-hadoop2-uber/target
的jar放在lib下即可

Zili Chen  于2019年8月7日周三 下午7:33写道:

> 这个问题以前邮件列表有人提过...不过现在 user-zh 没有 archive 不好引用。
>
> 你看下是不是 lib 下面没有 flink-shaded-hadoop-2-uber--7.0.jar 这样一个文件。
>
> 1.8.1 之后 FLINK 把 hadoop(YARN) 的 lib 分开 release 了,你要指定自己的 HADOOP_CLASSPATH
> 或者下载 FLINK 官网 pre-bundle 的 hadoop。
>
> 具体可以看这个页面(https://flink.apache.org/downloads.html)第一段的内容。
>
> Best,
> tison.
>
>
> 李玉环  于2019年8月7日周三 下午7:15写道:
>
> > Hi 大家好:
> >
> > 在使用flink过程中,运行官网给的命令
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > 报错如下:
> >
> > ➜  flink-1.8.1 ./bin/flink run -m yarn-cluster
> > ./examples/batch/WordCount.jar
> > 
> >  The program finished with the following exception:
> >
> > java.lang.RuntimeException: Could not identify hostname and port in
> > 'yarn-cluster'.
> > at
> >
> >
> org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47)
> > at
> >
> >
> org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83)
> > at
> >
> >
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60)
> > at
> >
> >
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35)
> > at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> > at
> >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> >
> >
> > 疑问:
> > 1.为什么会将 yarn-clustet解析为host?
> > 2.要运行single flink job on yarn的正确姿势是啥?
> >
> > Best,
> > Yuhuan
> >
>


Re: flink-1.8.1 yarn per job模式使用

2019-08-07 文章 Zili Chen
这个问题以前邮件列表有人提过...不过现在 user-zh 没有 archive 不好引用。

你看下是不是 lib 下面没有 flink-shaded-hadoop-2-uber--7.0.jar 这样一个文件。

1.8.1 之后 FLINK 把 hadoop(YARN) 的 lib 分开 release 了,你要指定自己的 HADOOP_CLASSPATH
或者下载 FLINK 官网 pre-bundle 的 hadoop。

具体可以看这个页面(https://flink.apache.org/downloads.html)第一段的内容。

Best,
tison.


李玉环  于2019年8月7日周三 下午7:15写道:

> Hi 大家好:
>
> 在使用flink过程中,运行官网给的命令
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> 报错如下:
>
> ➜  flink-1.8.1 ./bin/flink run -m yarn-cluster
> ./examples/batch/WordCount.jar
> 
>  The program finished with the following exception:
>
> java.lang.RuntimeException: Could not identify hostname and port in
> 'yarn-cluster'.
> at
>
> org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47)
> at
>
> org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83)
> at
>
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60)
> at
>
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>
>
> 疑问:
> 1.为什么会将 yarn-clustet解析为host?
> 2.要运行single flink job on yarn的正确姿势是啥?
>
> Best,
> Yuhuan
>


flink-1.8.1 yarn per job模式使用

2019-08-07 文章 李玉环
Hi 大家好:

在使用flink过程中,运行官网给的命令
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
报错如下:

➜  flink-1.8.1 ./bin/flink run -m yarn-cluster
./examples/batch/WordCount.jar

 The program finished with the following exception:

java.lang.RuntimeException: Could not identify hostname and port in
'yarn-cluster'.
at
org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47)
at
org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83)
at
org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60)
at
org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)


疑问:
1.为什么会将 yarn-clustet解析为host?
2.要运行single flink job on yarn的正确姿势是啥?

Best,
Yuhuan


Re:关于event-time的定义与产生时间戳位置的问题。

2019-08-07 文章 邵志鹏
Hi,
可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html
下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。
/**
 * This generator generates watermarks assuming that elements arrive out of 
order,
 * but only to a certain degree. The latest elements for a certain timestamp t 
will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 
*/publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks{privatefinallongmaxOutOfOrderness=3500;//
 3.5 
secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){//
 return the watermark as current highest timestamp minus the out-of-orderness 
boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/**
 * This generator generates watermarks that are lagging behind processing time 
by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 
*/publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks{privatefinallongmaxTimeLag=5000;//
 5 
seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){//
 return the watermark as current time minus the maximum time 
lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}}
publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks{@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}}
希望能有所帮助。


DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。


kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor(){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}});







在 2019-08-07 15:47:41,"xiaohei.info"  写道:
>
>hi,all:
>  event time这个时间戳是在什么时候打到数据上面去的,看api是在flink 
> source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka 
> source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
>  
> 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
>  不知道有哪里是我理解不对的地方望指教!
>  祝好~


关于event-time的定义与产生时间戳位置的问题。

2019-08-07 文章 xiaohei.info

hi,all:
  event time这个时间戳是在什么时候打到数据上面去的,看api是在flink 
source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka 
source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
  
那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
  不知道有哪里是我理解不对的地方望指教!
  祝好~


Re: Re: submit jobGraph error on server side

2019-08-07 文章 Zili Chen
从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。
但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把
gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。

Best,
tison.


王智  于2019年8月7日周三 下午2:33写道:

> 感谢您的回复与指导~
>
>
> 经过简单的验证(验证方案在邮件末尾),明确是网络问题。
>
>
> 现在我猜测是flink run 提交job graph 的时候打开了除 这四个以外的端口导致。麻烦再请教一下,flink jobmanager
> 是否会打开新的端口进行通讯(或者还有其他端口配置我没有注意到)
>
> ports:
>   - containerPort: 6123
> protocol: TCP
>   - containerPort: 6124
> protocol: TCP
>   - containerPort: 6125
> protocol: TCP
>   - containerPort: 8081
> protocol: TCP
>
>
> # flink conf 内的配置
>
> jobmanager.rpc.port: 6123
>
> jobmanager.rpc.port: 6123
>
> jobmanager.rpc.port: 6123
>
> blob.server.port: 6124
>
> query.server.port: 6125
>
> # 8081 没有配置,使用默认,web ui 可以正常访问
>
> 我使用k8s 搭建的环境,提交任务的节点(命名为gateway)与jobmanager 在两个不同的pod,gateway 通过jobmanager
> 对应的jobmanager-service
> 找到jobmanager对应的服务。猜测是因为我在服务中仅定义了上述4个端口,所以gateway节点上的进程无法通过jobmanager-service
> 与jobmanager 通讯。
>
>
> 附: 以下是我的验证方案: 将提交节点与jobmanager 放入同一个pod,使用回环地址(不会有端口限制)通讯,可以成功提交job【flink 
> 的配置和代码完全一致】
>
>
>
>
>
>
>
>
> 原始邮件
>
> 发件人:"Zili Chen"< wander4...@gmail.com >;
>
> 发件时间:2019/8/6 19:19
>
> 收件人:"user-zh"< user-zh@flink.apache.org >;
>
> 主题:Re: submit jobGraph error on server side
>
>
> 问题是 Ask timed out on [Actor[akka://flink/user/dispatcher#-273192824]] after
> [1 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.
> messages.LocalFencedMessage".
>
> 也就是 submit job 的时候在请求 Dispatcher 的时候 akka ask timeout
> 了,可以检查一下配置的地址和端口是否正确,或者贴出你的相关配置。
>
> Best,
> tison.
>
>
> 王智于2019年8月6日周二 下午7:13写道:
>
> > 向session cluster 提交job 出错,麻烦各位老师帮忙看下,给点排查提示 THX~
> >
> >
> >
> >
> > 环境:
> >
> > blink 1.8.0
> >
> > 用docker 方式启动的flink session cluster,flink 集群独立,我从集群外的一个docker
> > 节点提交job(该节点的flink-conf.yaml 配置与flink 集群内的配置一致)
> >
> >
> >
> >
> > --
> >
> >
> > 报错信息:
> >
> > 
> >
> >  The program finished with the following exception:
> >
> >
> >
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error:
> > org.apache.flink.client.program.ProgramInvocationException: Could not
> > retrieve the execution result. (JobID: 82
> >
> > 3a336683f6476b3e7ee2780c33395b)
> >
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> >
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >
> > at
> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> >
> > at
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> >
> > at
> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> >
> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >
> > at
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> >
> > at
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> >
> > at
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> >
> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> >
> > Caused by: java.lang.RuntimeException:
> > org.apache.flink.client.program.ProgramInvocationException: Could not
> > retrieve the execution result. (JobID: 823a336683f6476b3e7ee2780c33395b)
> >
> > at
> >
> com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:176)
> >
> > at
> >
> com.xx.data.platform.pandora.flink.EntryPoint.main(EntryPoint.java:78)
> >
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at
> > java.lang.reflect.Method.invoke(Method.java:498)
> >
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> >
> > ... 9 more
> >
> > Caused by: org.apache.flink.client.program.ProgramInvocationException:
> > Could not retrieve the execution result. (JobID:
> > 823a336683f6476b3e7ee2780c33395b)
> >
> > at
> >
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
> >
> > at
> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> >
> > at
> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
> >
> > at
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> >
> > at
> >
> com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:174)
> >
> > ... 15 more
> >
> >
> >
> > Caused by: org.apache.flink.runtime