Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-12 文章 zhisheng
hello, Yang Wang 这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗? Best, zhisheng Yang Wang 于2020年9月3日周四 上午11:15写道: > 目前HA模式下,application模式还不能支持多job,所以就固定是0了 > 主要的原因是recover的问题还没有解决好 > > > Best, > Yang > > chenkaibit 于2020年9月2日周三 下午7:29写道: > > > hi: > > 我在测试 flink-1.11 application 模式时发现 开启 HA 后

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi 把 -d 参加加上用分离方式启动 应该就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 hailongwang
据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。 我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启) 如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。 在 2020-11-13 09:13:34,"史 正超" 写道: >这是个思路,谢谢回复,我先试下。 > >发件人:

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件 newxmmxszc44.qq.com rejected your message to the following email addresses: m...@zhangzuofeng.cn Your message couldn't be delivered because the recipient's email system wasn't able to confirm that your message came from a trusted location. For Email

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件 newxmmxszc44.qq.com rejected your message to the following email addresses: m...@zhangzuofeng.cn Your message couldn't be delivered because the recipient's email system wasn't able to confirm that your message came from a trusted location. For Email

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi 从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached 的值是什么吗? 再有启动任务的时候是否加了 -d 参数 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-12 文章 jindy_liu
源表test: CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task',

Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 tison
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的 execute 应该也是会自动的去拉结果的。 可以看下下列关键日志的打印情况 - log.info("Job {} reached globally terminal state {}.", ...) - LOG.debug("Shutting down cluster because someone retrieved the job result."); - LOG.info("Shutting {} down with

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
hi 1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job Manager -> Configuration观察到execution.target值为yarn-per-job 2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job Manager -> logs查看失败日志内容) best, amenhub 发件人: JasonLee 发送时间: 2020-11-13

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
hi 1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job Manager -> Configuration观察到execution.target值为yarn-per-job 2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job Manager -> logs查看失败日志内容) best, amenhub 发件人: JasonLee 发送时间: 2020-11-13

Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi 1,首先确定你提交的是per-job模式吗? 2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 kingdomad
这个问题我也遇到过。 1.11版本,提交任务如果没加-d参数,flink程序挂掉了,但是yarn的application还一直是running状态,就相当于一个常驻的yarn session。 加上-d的话才能把flink程序和yarn application的生命周期捆绑到一起。 -- kingdomad 在 2020-11-13 11:16:02,"amen...@163.com" 写道: 当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
>>>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。 按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running.. >>>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢? 这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1 昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web

Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 tison
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。 当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。 你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢? Best, tison. zhisheng 于2020年11月12日周四 下午8:17写道: > 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态 > > hdxg1101300123

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 Danny Chan
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态 Lei Wang 于2020年11月12日周四 下午9:17写道: > 用 session windown 确实能满足功能: > > > robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, > y) -> y); > > 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 > > > 正常情况下 robot

回复: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
这是个思路,谢谢回复,我先试下。 发件人: 赵一旦 发送时间: 2020年11月13日 2:05 收件人: user-zh@flink.apache.org 主题: Re: flink-1.11.2 执行checkpoint失败 如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。 有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。 史 正超 于2020年11月13日周五

Re: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 赵一旦
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。 有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。 史 正超 于2020年11月13日周五 上午10:01写道: > 从上面看是的。 > > public void handleJobLevelCheckpointException(CheckpointException > exception, long checkpointId) { >

回复: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
从上面看是的。 public void handleJobLevelCheckpointException(CheckpointException exception, long checkpointId) { checkFailureCounter(exception, checkpointId); if (continuousFailureCounter.get() > tolerableCpFailureNumber) { clearCount(); failureCallback.failJob(new

Re: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 赵一旦
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢? 史 正超 于2020年11月12日周四 下午9:23写道: > 执行checkpoint失败,报下面的错。 > 2020-11-12 21:04:56 > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable >

??????????: ??????flink-1.11.2 ????checkpoint????

2020-11-12 文章 ?n?e?m?a ????????
??1.11.2??debug?? ?? ---- ??:

回复: 回复:flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
sql简化后类似这样, 做checkpoint超时 CREATE VIEW fcbox_send_fat_view AS SELECT REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate, disCode, IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName, dislv, IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS expressCompanyId,

回复: 回复:flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
没有,用的是jdbc sink,先是 三张change log的 left join,然后 再分别与两张mysql维表的join,执行checkpoint超时。sql太复杂了,我越写越没信心。。。 ``` CREATE VIEW fcbox_send_fat_view AS SELECT REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate, disCode, IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName, dislv,

回复:flink-1.11.2 执行checkpoint失败

2020-11-12 文章 601049502
首先,我想知道你是否使用了kafka sink? 在 2020年11月12日 21:16,史 正超 写道: 执行checkpoint失败,报下面的错。 2020-11-12 21:04:56 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at

Re:Re: flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 hailongwang
已经建了支持 first_value 和 last_value 的 merge 方法 issue[1]。 同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。 @李世钰 感兴趣的话可以认领下哈。 [1] https://issues.apache.org/jira/browse/FLINK-20110 [2] https://issues.apache.org/jira/browse/FLINK-20111 At 2020-11-12 19:51:29, "Jark Wu" wrote:

flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
执行checkpoint失败,报下面的错。 2020-11-12 21:04:56 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) at

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 Lei Wang
用 session windown 确实能满足功能: robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, y) -> y); 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 On Thu, Nov 12, 2020 at 5:25 PM hailongwang

Re: ElasticsearchApiCallBridge相关类构造函数问题

2020-11-12 文章 zhisheng
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码 Luna Wong 于2020年11月11日周三 下午9:16写道: > 为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。 > 我还想继承Elasticsearch6ApiCallBridge类。在new > RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。 > > 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?

Re: flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 Jark Wu
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。 On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote: > Hi me, > > > HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane > 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value > 不支持 merge。 >

Re:flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 hailongwang
Hi me, HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。 Best, hailong 在 2020-11-12 17:07:58,"李世钰" 写道: >1. FLINK版本 flink1.11 > > > > >2. 使用的是useBlinkPlanner > > > > >3.执行sql > >SELECT

Re: Re: slot数量与并行度的大小关系

2020-11-12 文章 zhisheng
可以参考 http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ 文章理解一下 hl9...@126.com 于2020年11月12日周四 下午4:47写道: > 是flink standalone 集群。 > job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。 > > > > hl9...@126.com > > 发件人: Xintong Song > 发送时间: 2020-11-12 13:18 > 收件人:

Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 zhisheng
同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态 hdxg1101300123 于2020年11月12日周四 下午8:07写道: > 可以设置检查点失败任务也失败 > > > > 发自vivo智能手机 > > hi everyone, > > > > 最近在使用Flink-1.11.1 On Yarn Per > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn > application仍处于运行状态 > > > >

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 zhisheng
hi 可以看看 Timer 的机制,能不能解决你的问题 Best zhisheng hailongwang <18868816...@163.com> 于2020年11月12日周四 下午5:25写道: > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > 在 2020-11-12 14:34:59,"Danny Chan" 写道: > >感觉你这个应该是一个 session

回复: Flink与Yarn的状态一致性问题

2020-11-12 文章 hdxg1101300123
可以设置检查点失败任务也失败 发自vivo智能手机 > hi everyone, > > 最近在使用Flink-1.11.1 On Yarn Per > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn > application仍处于运行状态 > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢 > > best, > amenhub

Re: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-11-12 文章 amen...@163.com
hi, 我现在的版本是flink-1.11.1没有加-d参数,也遇见了同样的问题,不知道是什么情况呢? best, amenhub 发件人: Yang Wang 发送时间: 2020-08-05 10:28 收件人: user-zh 主题: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running 你的Flink任务应该是用attach的方式起的,也就是没有加-d,这种情况在1.10之前起的任务本质上是一个session,

flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 李世钰
1. FLINK版本 flink1.11 2. 使用的是useBlinkPlanner 3.执行sql SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS

flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 me
1. FLINK版本 flink1.11 2. 使用的是useBlinkPlanner 3.执行sql SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS

Re: 关于filesystem connector的一点疑问

2020-11-12 文章 Jingsong Li
尽早的可查,直接把delay设为0即可 (其它默认值) On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote: > Hi,jingsong > 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 > 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, > 比如sink.partition-commit.trigger = partition-time > sink.partition-commit.delay = 10 min > > >

Re:Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-12 文章 hailongwang
这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是: 1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“ 2、一直没有 join 上 mysql 的数据导致的。 可以设置下 数据库的 wait_timeout 看下 PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。 最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致 这条数据被自动ack而丢弃的。 如果开启 checkpoint 的话,下游支持 upsert

Re:Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 hailongwang
这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 在 2020-11-12 14:34:59,"Danny Chan" 写道: >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > >Lei Wang 于2020年11月11日周三 下午2:03写道: > >> 有很多边缘机器人设备(我们称为 robot)往

Re: 关于filesystem connector的一点疑问

2020-11-12 文章 admin
Hi,jingsong 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, 比如sink.partition-commit.trigger = partition-time sink.partition-commit.delay = 10 min > 2020年11月12日 下午3:22,Jingsong Li 写道: > > Hi admin, > > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) > > On Thu,

Re: Re: slot数量与并行度的大小关系

2020-11-12 文章 hl9...@126.com
是flink standalone 集群。 job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。 hl9...@126.com 发件人: Xintong Song 发送时间: 2020-11-12 13:18 收件人: user-zh 主题: Re: slot数量与并行度的大小关系 你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的? 流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]