回复: Flink命令提交任务时是否支持配置文件与任务jar包分离

2019-08-11 文章 venn
可以分离,客户端提交的时候,初始化是在客户端上完成的,JobGraph 提交到
JobManager 之后不需要配置文件了


-邮件原件-
发件人: user-zh-return-797-wxchunjhyy=163@flink.apache.org
 代表
jinxiaolong_al...@163.com
发送时间: Saturday, August 10, 2019 12:33 AM
收件人: user-zh 
主题: Flink命令提交任务时是否支持配置文件与任务jar包分离

各位社区大佬:
   请问使用Flink命令提交任务时是否支持配置文件与任务jar包分离。
比如我的任务自身有个配置文件job.yaml,目前该配置是打到jar包中随任务提交的,可
是有时候只是要调整下配置代码没改动也要重新打包发到环境上,感觉这样不灵活,
所以我想问下能不能单独把配置文件(可能是多个文件)放到一个目录下,然后提交任务
的时候指定配置文件或者是配置目录。
类似jobManager把这些配置分发到TaskManager的classPath下这样的逻辑,这样就不用
改下配置也要重新打包发到环境上了。
倒是有个-yt参数,但是这个是用来将指定的jar包传到容器中,不适用我说的场景吧。
各位大佬请问有没有好的办法或思路,求指导。

我用的flink版本是1.7.2



jinxiaolong_al...@163.com


答复: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

2019-08-11 文章 Yuan,Youjun
并不是没条消息会触发watermark,而是有一定时间间隔的,默认是200ms触发一次watermark。
当你的数据来的比较集中的时候,经常会发生最新的消息的时间戳已经过了window end,但是window还没fire的情况。


-邮件原件-
发件人: Ever <439674...@qq.com> 
发送时间: Sunday, July 14, 2019 5:00 PM
收件人: user-zh 
主题: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

第四条数据来的时间戳是: 03:17:55,  水印时间这时候应该是03:17:50,  
不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间, 都已经过了, 都应该关闭了啊




-- 原始邮件 --
发件人: "Hequn Cheng";
发送时间: 2019年7月14日(星期天) 中午11:55
收件人: "user-zh";

主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题



Hi,

应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Fri, Jul 12, 2019 at 4:05 PM Ever <439674...@qq.com> wrote:

> 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
> 数据每隔10秒会过来一批。
> 代码如下图:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
>
> env.setParallelism(parallel)
>
>
> env.addSource(source)
>   .map(json => {
>   new InvokingInfoWrapper(xxx)
> })
>   .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seco
> nds(5))
> {
> override def extractTimestamp(invoking: InvokingInfoWrapper): 
> Long = {
>   invoking.timestamp
> }
>   })
>   .keyBy(invokingInfo => {
> s"${invokingInfo.caller}_${invokingInfo.callee}"
>   })
>   .timeWindow(Time.seconds(60), Time.seconds(10))
>   .reduce(innerReducer).map(invokingInfo => { // ##2map
>   //some mapping code
>   invokingInfo
>   })
>   .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-s
> ink")
>
> ```
>
>
>
>
> 由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
> 1. 第一条数据的时间戳为03:15:48
> 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
> 3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
> 4. 第四条数据的时间戳为03:17:55,
>  这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。
> 5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。
>
>
> 感觉前三条数据给吞了。
>
>
> 为什么呢?


Re: Re: 有一些TaskManager的slot不可用,尽管没有任务正在运行

2019-08-11 文章 pengcheng...@bonc.com.cn
你好,谢谢,图片显示确实有问题,不过没关系,图片不是很重要。



pengcheng...@bonc.com.cn
 
发件人: Xintong Song
发送时间: 2019-08-09 20:09
收件人: user-zh
主题: Re: 有一些TaskManager的slot不可用,尽管没有任务正在运行
Hi,
 
邮件中的图片显示不出来。Flink邮件列表的图片附件是有点问题的,如果是截图最好上传到其他地方然后把链接贴出来。
 
Thank you~
 
Xintong Song
 
 
 
On Fri, Aug 9, 2019 at 10:06 AM pengcheng...@bonc.com.cn <
pengcheng...@bonc.com.cn> wrote:
 
> 各位大佬:
>
> 有对这种情况比较了解的吗?任务结束后,一些slot并没有释放掉。
>
>
> 如图所示:
>
>
>
>
> --
> pengcheng...@bonc.com.cn
>


回复: flink1.10版本连接hive报错

2019-08-11 文章 athlon...@gmail.com
先问下你的hdfs上的文件是不是用lzo压缩的?



athlon...@gmail.com
 
发件人: 苏 欣
发送时间: 2019-08-09 17:40
收件人: user-zh@flink.apache.org
主题: flink1.10版本连接hive报错
使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
请问有人遇到过这种问题吗?
 
报错信息如下:

The program finished with the following exception:
 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., (JobManagerRunner.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Error in configuring object
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:270)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:89

Re: flink源码编译可以不编译scala代码吗

2019-08-11 文章 Zili Chen
Flink 编译的默认 Scala 版本是 2.11.x,你可以试着把 Scala 版本切换成 2.11.x 再编译一下。

Best,
tison.


苟刚  于2019年8月10日周六 下午11:08写道:

>
>
>
> Hi,All:
>
>
>   我再尝试编译flink 1.7的源码时,遇到如下错误,本人对scala不是很了解,不知道是不是版本问题引起,另外可以去掉sacla模块编译吗:
>  本机scala版本:2.13.0
> JDK 版本: 1.8.0_91
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
> (default-compile) on project flink-scala_2.11: Compilation failure:
> Compilation failure:
> [ERROR]
> /Users/gang.gou/work/git/github/flink/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java:[67,44]
> 不兼容的类型:
> 无法推断org.apache.flink.api.java.typeutils.runtime.EitherSerializer<>的类型参数
> [ERROR] 原因: 不存在类型变量L,R,T,T的实例,
> 以使org.apache.flink.api.java.typeutils.runtime.EitherSerializer与org.apache.flink.api.common.typeutils.TypeSerializer>一致
> [ERROR]
> /Users/gang.gou/work/git/github/flink/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java:[78,86]
> 不兼容的类型:
> org.apache.flink.api.common.typeutils.TypeSerializer>无法转换为org.apache.flink.api.java.typeutils.runtime.EitherSerializer
> [ERROR] -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :flink-scala_2.11
>
>
> Process finished with exit code 1
>
>
>
>
> --
> Best Wishes
>Galen.K


Re: need help

2019-08-11 文章 Zili Chen
异常原因如上所说是 akka ask timeout 的问题,这个问题前两天有人在部署 k8s 的时候也遇过[1]

他的情况是配置资源过少导致 JM 未能及时响应。除了调整上述参数外也可看看是不是这个问题。

Best,
tison.

[1]
https://lists.apache.org/thread.html/84db9dca2e990dd0ebc30aa35390ac75a0e9c7cbfcdbc2029986d4d7@%3Cuser-zh.flink.apache.org%3E


Biao Liu  于2019年8月8日周四 下午8:00写道:

> 你好,
>
> 异常里可以看出 AskTimeoutException, 可以调整两个参数 akka.ask.timeout 和 web.timeout
> 再试一下,默认值如下
>
> akka.ask.timeout: 10 s
> web.timeout: 1
>
> PS: 搜 “AskTimeoutException Flink” 可以搜到很多相关答案
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, Aug 8, 2019 at 7:33 PM 陈某  wrote:
>
> >
> >
> > -- Forwarded message -
> > 发件人: 陈某 
> > Date: 2019年8月8日周四 下午7:25
> > Subject: need help
> > To: 
> >
> >
> > 你好,我是一个刚接触flink的新手,在搭建完flink on
> >
> yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。
> >
> > 使用的运行指令为:
> > [root@flink01 logs]# flink run -m  yarn-cluster
> > ./examples/streaming/WordCount.jar
> > 查看log后错误信息如下:(附件中为完整的log文件)
> > org.apache.flink.client.program.ProgramInvocationException: Could not
> > retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
> > at
> >
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
> > at
> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > at
> >
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> > at
> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> > at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > at
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> > to submit JobGraph.
> > at
> >
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
> > at
> >
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> > at
> >
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> > at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
> > at
> >
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> > at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> > at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > at
> >
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> > at
> >
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> > at
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> > [Internal server error.,  > akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/dispatcher#2035575525]] after [1 ms].
> > Sender[null] sent message of type
> > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> > at
>

Re: 回复:Flink命令提交任务时是否支持配置文件与任务jar包分离

2019-08-11 文章 jinxiaolong_al...@163.com
你好:
   感谢你的回答,使用配置中心托管配置是可以解决这个问题。
其实我提问的初衷是想确认下flink自身在应用或部署层面上有没有相关的能力或使用技巧解决这个问题,或者是不是后续的演进已经考虑这方面的问题,
可能我还不知道,求社区大佬们指点。
我在用的版本是1.7.2



jinxiaolong_al...@163.com
 
发件人: huanqinghappy
发送时间: 2019-08-11 00:48
收件人: user-zh
主题: 回复:Flink命令提交任务时是否支持配置文件与任务jar包分离
你好:
  是不是可以直接使用配置中心 例如把配置信息写到zookeeper上 这样还能做到当配置信息更改后不应用也可以做相应改变
 
 
--
发件人:jinxiaolong_al...@163.com 
发送时间:2019年8月10日(星期六) 00:32
收件人:user-zh 
主 题:Flink命令提交任务时是否支持配置文件与任务jar包分离
 
各位社区大佬:
   请问使用Flink命令提交任务时是否支持配置文件与任务jar包分离。
比如我的任务自身有个配置文件job.yaml,目前该配置是打到jar包中随任务提交的,可是有时候只是要调整下配置代码没改动也要重新打包发到环境上,感觉这样不灵活,
所以我想问下能不能单独把配置文件(可能是多个文件)放到一个目录下,然后提交任务的时候指定配置文件或者是配置目录。
类似jobManager把这些配置分发到TaskManager的classPath下这样的逻辑,这样就不用改下配置也要重新打包发到环境上了。
倒是有个-yt参数,但是这个是用来将指定的jar包传到容器中,不适用我说的场景吧。
各位大佬请问有没有好的办法或思路,求指导。
 
我用的flink版本是1.7.2
 
 
 
jinxiaolong_al...@163.com