hello, Yang Wang
这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?
Best,
zhisheng
Yang Wang 于2020年9月3日周四 上午11:15写道:
> 目前HA模式下,application模式还不能支持多job,所以就固定是0了
> 主要的原因是recover的问题还没有解决好
>
>
> Best,
> Yang
>
> chenkaibit 于2020年9月2日周三 下午7:29写道:
>
> > hi:
> > 我在测试 flink-1.11 application 模式时发现 开启 HA 后
hi
把 -d 参加加上用分离方式启动 应该就可以了
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。
我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启)
如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。
在 2020-11-13 09:13:34,"史 正超" 写道:
>这是个思路,谢谢回复,我先试下。
>
>发件人:
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件
newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't
able to confirm that your message came from a trusted location.
For Email
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件
newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't
able to confirm that your message came from a trusted location.
For Email
hi
从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached
的值是什么吗? 再有启动任务的时候是否加了 -d 参数
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_audio_lyric_task',
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的
execute 应该也是会自动的去拉结果的。
可以看下下列关键日志的打印情况
- log.info("Job {} reached globally terminal state {}.", ...)
- LOG.debug("Shutting down cluster because someone retrieved the job
result.");
- LOG.info("Shutting {} down with
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job
Manager -> logs查看失败日志内容)
best,
amenhub
发件人: JasonLee
发送时间: 2020-11-13
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job
Manager -> logs查看失败日志内容)
best,
amenhub
发件人: JasonLee
发送时间: 2020-11-13
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
这个问题我也遇到过。
1.11版本,提交任务如果没加-d参数,flink程序挂掉了,但是yarn的application还一直是running状态,就相当于一个常驻的yarn
session。
加上-d的话才能把flink程序和yarn application的生命周期捆绑到一起。
--
kingdomad
在 2020-11-13 11:16:02,"amen...@163.com" 写道:
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
>>>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running..
>>>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1
昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
Best,
tison.
zhisheng 于2020年11月12日周四 下午8:17写道:
> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态
Lei Wang 于2020年11月12日周四 下午9:17写道:
> 用 session windown 确实能满足功能:
>
>
> robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
> y) -> y);
>
> 按照这种写法, 我理解 window state 中只保存了最近的一条记录。
>
>
> 正常情况下 robot
这是个思路,谢谢回复,我先试下。
发件人: 赵一旦
发送时间: 2020年11月13日 2:05
收件人: user-zh@flink.apache.org
主题: Re: flink-1.11.2 执行checkpoint失败
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。
史 正超 于2020年11月13日周五
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。
史 正超 于2020年11月13日周五 上午10:01写道:
> 从上面看是的。
>
> public void handleJobLevelCheckpointException(CheckpointException
> exception, long checkpointId) {
>
从上面看是的。
public void handleJobLevelCheckpointException(CheckpointException exception,
long checkpointId) {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
failureCallback.failJob(new
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?
史 正超 于2020年11月12日周四 下午9:23写道:
> 执行checkpoint失败,报下面的错。
> 2020-11-12 21:04:56
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
>
??1.11.2??debug??
??
----
??:
sql简化后类似这样, 做checkpoint超时
CREATE VIEW fcbox_send_fat_view AS
SELECT
REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate,
disCode,
IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
dislv,
IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS
expressCompanyId,
没有,用的是jdbc sink,先是 三张change log的 left join,然后
再分别与两张mysql维表的join,执行checkpoint超时。sql太复杂了,我越写越没信心。。。
```
CREATE VIEW fcbox_send_fat_view AS
SELECT
REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate,
disCode,
IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
dislv,
首先,我想知道你是否使用了kafka sink?
在 2020年11月12日 21:16,史 正超 写道:
执行checkpoint失败,报下面的错。 2020-11-12 21:04:56
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold. at
已经建了支持 first_value 和 last_value 的 merge 方法 issue[1]。
同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。
@李世钰 感兴趣的话可以认领下哈。
[1] https://issues.apache.org/jira/browse/FLINK-20110
[2] https://issues.apache.org/jira/browse/FLINK-20111
At 2020-11-12 19:51:29, "Jark Wu" wrote:
执行checkpoint失败,报下面的错。
2020-11-12 21:04:56
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at
用 session windown 确实能满足功能:
robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
y) -> y);
按照这种写法, 我理解 window state 中只保存了最近的一条记录。
正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。
On Thu, Nov 12, 2020 at 5:25 PM hailongwang
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码
Luna Wong 于2020年11月11日周三 下午9:16写道:
> 为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
> 我还想继承Elasticsearch6ApiCallBridge类。在new
> RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。
>
> 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。
On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote:
> Hi me,
>
>
> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value
> 不支持 merge。
>
Hi me,
HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用
UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。
Best,
hailong
在 2020-11-12 17:07:58,"李世钰" 写道:
>1. FLINK版本 flink1.11
>
>
>
>
>2. 使用的是useBlinkPlanner
>
>
>
>
>3.执行sql
>
>SELECT
可以参考 http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ 文章理解一下
hl9...@126.com 于2020年11月12日周四 下午4:47写道:
> 是flink standalone 集群。
> job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。
>
>
>
> hl9...@126.com
>
> 发件人: Xintong Song
> 发送时间: 2020-11-12 13:18
> 收件人:
同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
hdxg1101300123 于2020年11月12日周四 下午8:07写道:
> 可以设置检查点失败任务也失败
>
>
>
> 发自vivo智能手机
> > hi everyone,
> >
> > 最近在使用Flink-1.11.1 On Yarn Per
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> application仍处于运行状态
> >
> >
hi
可以看看 Timer 的机制,能不能解决你的问题
Best zhisheng
hailongwang <18868816...@163.com> 于2020年11月12日周四 下午5:25写道:
>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan" 写道:
> >感觉你这个应该是一个 session
可以设置检查点失败任务也失败
发自vivo智能手机
> hi everyone,
>
> 最近在使用Flink-1.11.1 On Yarn Per
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> application仍处于运行状态
>
> 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
>
> best,
> amenhub
hi,
我现在的版本是flink-1.11.1没有加-d参数,也遇见了同样的问题,不知道是什么情况呢?
best,
amenhub
发件人: Yang Wang
发送时间: 2020-08-05 10:28
收件人: user-zh
主题: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running
你的Flink任务应该是用attach的方式起的,也就是没有加-d,这种情况在1.10之前起的任务本质上是一个session,
1. FLINK版本 flink1.11
2. 使用的是useBlinkPlanner
3.执行sql
SELECT FIRST_VALUE(kafka_table.src_ip) AS
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
SECOND(2), INTERVAL '2' MINUTE(1)) AS
1. FLINK版本 flink1.11
2. 使用的是useBlinkPlanner
3.执行sql
SELECT FIRST_VALUE(kafka_table.src_ip) AS
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
SECOND(2), INTERVAL '2' MINUTE(1)) AS
尽早的可查,直接把delay设为0即可 (其它默认值)
On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote:
> Hi,jingsong
> 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
> 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
> 比如sink.partition-commit.trigger = partition-time
> sink.partition-commit.delay = 10 min
>
> >
这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是:
1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“
2、一直没有 join 上 mysql 的数据导致的。
可以设置下 数据库的 wait_timeout 看下
PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。
最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致
这条数据被自动ack而丢弃的。
如果开启 checkpoint 的话,下游支持 upsert
这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
在 2020-11-12 14:34:59,"Danny Chan" 写道:
>感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
>
>Lei Wang 于2020年11月11日周三 下午2:03写道:
>
>> 有很多边缘机器人设备(我们称为 robot)往
Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
比如sink.partition-commit.trigger = partition-time
sink.partition-commit.delay = 10 min
> 2020年11月12日 下午3:22,Jingsong Li 写道:
>
> Hi admin,
>
> 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
>
> On Thu,
是flink standalone 集群。
job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。
hl9...@126.com
发件人: Xintong Song
发送时间: 2020-11-12 13:18
收件人: user-zh
主题: Re: slot数量与并行度的大小关系
你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?
流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]
41 matches
Mail list logo