Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-12 文章 zhisheng
hello, Yang Wang

这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?

Best,
zhisheng

Yang Wang  于2020年9月3日周四 上午11:15写道:

> 目前HA模式下,application模式还不能支持多job,所以就固定是0了
> 主要的原因是recover的问题还没有解决好
>
>
> Best,
> Yang
>
> chenkaibit  于2020年9月2日周三 下午7:29写道:
>
> > hi:
> >  我在测试 flink-1.11 application 模式时发现 开启 HA 后 jobID 总是
> > ;关闭 HA 后是个随机字符(和之前版本相同)。这个是个 bug
> 还是就是这么设计的?
> >  求大神解答。
> >
> > --
> >
> > Best, yuchuan
>


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi

把 -d 参加加上用分离方式启动 应该就可以了 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:回复: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 hailongwang
据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。
我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启)
如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。

在 2020-11-13 09:13:34,"史 正超"  写道:
>这是个思路,谢谢回复,我先试下。
>
>发件人: 赵一旦 
>发送时间: 2020年11月13日 2:05
>收件人: user-zh@flink.apache.org 
>主题: Re: flink-1.11.2 执行checkpoint失败
>
>如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
>有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。
>
>史 正超  于2020年11月13日周五 上午10:01写道:
>
>> 从上面看是的。
>>
>> public void handleJobLevelCheckpointException(CheckpointException
>> exception, long checkpointId) {
>>checkFailureCounter(exception, checkpointId);
>>if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
>>   clearCount();
>>   failureCallback.failJob(new FlinkRuntimeException("Exceeded
>> checkpoint tolerable failure threshold."));
>>}
>> }
>>
>> 大于阈值就报那个错了。
>> 
>> 发件人: 赵一旦 
>> 发送时间: 2020年11月13日 1:56
>> 收件人: user-zh@flink.apache.org 
>> 主题: Re: flink-1.11.2 执行checkpoint失败
>>
>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
>> failure threshold.
>> 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?
>>
>> 史 正超  于2020年11月12日周四 下午9:23写道:
>>
>> > 执行checkpoint失败,报下面的错。
>> > 2020-11-12 21:04:56
>> > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>> tolerable
>> > failure threshold.
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > at
>> >
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> > at
>> >
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>>


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件

newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't 
able to confirm that your message came from a trusted location.
For Email Administrators
This error is related to the Sender Policy Framework (SPF). The destination 
email system's evaluation of the SPF record for the message resulted in an 
error. Please work with your domain registrar to ensure your SPF records are 
correctly configured.


再回到问题上来吧,我启动的时候是没有加-d参数的,启动之后execution.attached的值是true,execution.target值是yarn-per-job

best,
amenhub



 
发件人: amen...@163.com
发送时间: 2020-11-13 11:30
收件人: user-zh
主题: Re: Re: Flink与Yarn的状态一致性问题
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)
 
best,
amenhub
 
 
 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件

newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't 
able to confirm that your message came from a trusted location.
For Email Administrators
This error is related to the Sender Policy Framework (SPF). The destination 
email system's evaluation of the SPF record for the message resulted in an 
error. Please work with your domain registrar to ensure your SPF records are 
correctly configured.


再回到问题上来吧,我启动的时候是没有加-d参数的,启动之后execution.attached的值是true,execution.target值是yarn-per-job

best,
amenhub



 
发件人: amen...@163.com
发送时间: 2020-11-13 11:30
收件人: user-zh
主题: Re: Re: Flink与Yarn的状态一致性问题
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)
 
best,
amenhub
 
 
 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi

从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached
的值是什么吗? 再有启动任务的时候是否加了 -d 参数



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-12 文章 jindy_liu
源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

输出表
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '10',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);


输出语句:
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据
test: 
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.

status
0, status0
1, status1
2, status2
.

操作顺序与复现:
1、启动任务,设置并行度为40,
表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
savepoint保存,然后web ui上取消任务。
  ==> test_status中的数据正常:
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1
2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
job  下,
  ==> test_status中的数据正常:
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1_modify
2, name2, 2020-07-06 00:00:00 , 1, status1_modify
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
job  下
  ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
0, name0, 2020-07-06 00:00:00 , 0, status0


怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
如果是,能不能在sink的时候,只把sink这里的并行度设置为1??







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 tison
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的
execute 应该也是会自动的去拉结果的。

可以看下下列关键日志的打印情况

- log.info("Job {} reached globally terminal state {}.", ...)
- LOG.debug("Shutting down cluster because someone retrieved the job
result.");
- LOG.info("Shutting {} down with application status {}. Diagnostics {}.",
...)

Best,
tison.


JasonLee <17610775...@163.com> 于2020年11月13日周五 上午11:22写道:

> hi
> 1,首先确定你提交的是per-job模式吗?
> 2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
>
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)

best,
amenhub



 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
 
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)

best,
amenhub



 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
 
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?




-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 kingdomad
这个问题我也遇到过。
1.11版本,提交任务如果没加-d参数,flink程序挂掉了,但是yarn的application还一直是running状态,就相当于一个常驻的yarn 
session。
加上-d的话才能把flink程序和yarn application的生命周期捆绑到一起。




--

kingdomad







在 2020-11-13 11:16:02,"amen...@163.com"  写道:
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
>按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running..
>
你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
>这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1
>
>昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web 
>ui状态为Failed的时候,实时反馈Failed状态给Yarn,从而解决Yarn仍为Running的问题,
>也提到说Flink-1.10及以后的Per-job模式是YarnJobClusterEntrypoint,这个确实没错,但是我面临的问题仍然和Flink-1.10以前的问题一致,
>就是Flink web ui观察任务已经Fail掉了,但Yarn application仍然在Running
>
>另外,发现Flink web ui观察任务如果是Finished的话,也会处于Running,这个算属于正常吗?(以上描述的作业都是Streaming 
>job)
>
>best,
>amenhub
>
>
> 
>发件人: tison
>发送时间: 2020-11-13 11:01
>收件人: user-zh
>主题: Re: Flink与Yarn的状态一致性问题
>PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。
> 
>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
> 
>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
> 
>Best,
>tison.
> 
> 
>zhisheng  于2020年11月12日周四 下午8:17写道:
> 
>> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>>
>> hdxg1101300123  于2020年11月12日周四 下午8:07写道:
>>
>> > 可以设置检查点失败任务也失败
>> >
>> >
>> >
>> > 发自vivo智能手机
>> > > hi everyone,
>> > >
>> > > 最近在使用Flink-1.11.1 On Yarn Per
>> > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
>> > application仍处于运行状态
>> > >
>> > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
>> > >
>> > > best,
>> > > amenhub
>>


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 amen...@163.com
>>>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running..

>>>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1

昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web 
ui状态为Failed的时候,实时反馈Failed状态给Yarn,从而解决Yarn仍为Running的问题,
也提到说Flink-1.10及以后的Per-job模式是YarnJobClusterEntrypoint,这个确实没错,但是我面临的问题仍然和Flink-1.10以前的问题一致,
就是Flink web ui观察任务已经Fail掉了,但Yarn application仍然在Running

另外,发现Flink web ui观察任务如果是Finished的话,也会处于Running,这个算属于正常吗?(以上描述的作业都是Streaming job)

best,
amenhub


 
发件人: tison
发送时间: 2020-11-13 11:01
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。
 
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
 
你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
 
Best,
tison.
 
 
zhisheng  于2020年11月12日周四 下午8:17写道:
 
> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123  于2020年11月12日周四 下午8:07写道:
>
> > 可以设置检查点失败任务也失败
> >
> >
> >
> > 发自vivo智能手机
> > > hi everyone,
> > >
> > > 最近在使用Flink-1.11.1 On Yarn Per
> > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> > application仍处于运行状态
> > >
> > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> > >
> > > best,
> > > amenhub
>


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 tison
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。

当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?

Best,
tison.


zhisheng  于2020年11月12日周四 下午8:17写道:

> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123  于2020年11月12日周四 下午8:07写道:
>
> > 可以设置检查点失败任务也失败
> >
> >
> >
> > 发自vivo智能手机
> > > hi everyone,
> > >
> > > 最近在使用Flink-1.11.1 On Yarn Per
> > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> > application仍处于运行状态
> > >
> > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> > >
> > > best,
> > > amenhub
>


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 Danny Chan
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态

Lei Wang  于2020年11月12日周四 下午9:17写道:

> 用 session windown 确实能满足功能:
>
>
> robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
> y) -> y);
>
> 按照这种写法, 我理解 window state 中只保存了最近的一条记录。
>
>
> 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。
>
>
>
> On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote:
>
> >
> >
> >
> > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
> >
> >
> >
> >
> > 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> > >
> > >Lei Wang  于2020年11月11日周三 下午2:03写道:
> > >
> > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> > >>
> > >> 比如
> > >> robot1   2020-11-11 12:00:00 msginfo
> > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> > 就发出报警呢?
> > >>
> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> > >>
> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> > >>
> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> > >> 我们必须 按 robotId 做 keyBy
> > >>
> > >> 求大神指教。
> > >>
> > >> 谢谢,
> > >> 王磊
> > >>
> >
>


回复: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
这是个思路,谢谢回复,我先试下。

发件人: 赵一旦 
发送时间: 2020年11月13日 2:05
收件人: user-zh@flink.apache.org 
主题: Re: flink-1.11.2 执行checkpoint失败

如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。

史 正超  于2020年11月13日周五 上午10:01写道:

> 从上面看是的。
>
> public void handleJobLevelCheckpointException(CheckpointException
> exception, long checkpointId) {
>checkFailureCounter(exception, checkpointId);
>if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
>   clearCount();
>   failureCallback.failJob(new FlinkRuntimeException("Exceeded
> checkpoint tolerable failure threshold."));
>}
> }
>
> 大于阈值就报那个错了。
> 
> 发件人: 赵一旦 
> 发送时间: 2020年11月13日 1:56
> 收件人: user-zh@flink.apache.org 
> 主题: Re: flink-1.11.2 执行checkpoint失败
>
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?
>
> 史 正超  于2020年11月12日周四 下午9:23写道:
>
> > 执行checkpoint失败,报下面的错。
> > 2020-11-12 21:04:56
> > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable
> > failure threshold.
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
>


Re: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 赵一旦
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。

史 正超  于2020年11月13日周五 上午10:01写道:

> 从上面看是的。
>
> public void handleJobLevelCheckpointException(CheckpointException
> exception, long checkpointId) {
>checkFailureCounter(exception, checkpointId);
>if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
>   clearCount();
>   failureCallback.failJob(new FlinkRuntimeException("Exceeded
> checkpoint tolerable failure threshold."));
>}
> }
>
> 大于阈值就报那个错了。
> 
> 发件人: 赵一旦 
> 发送时间: 2020年11月13日 1:56
> 收件人: user-zh@flink.apache.org 
> 主题: Re: flink-1.11.2 执行checkpoint失败
>
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?
>
> 史 正超  于2020年11月12日周四 下午9:23写道:
>
> > 执行checkpoint失败,报下面的错。
> > 2020-11-12 21:04:56
> > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable
> > failure threshold.
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
>


回复: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
从上面看是的。

public void handleJobLevelCheckpointException(CheckpointException exception, 
long checkpointId) {
   checkFailureCounter(exception, checkpointId);
   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
  clearCount();
  failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint 
tolerable failure threshold."));
   }
}

大于阈值就报那个错了。

发件人: 赵一旦 
发送时间: 2020年11月13日 1:56
收件人: user-zh@flink.apache.org 
主题: Re: flink-1.11.2 执行checkpoint失败

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?

史 正超  于2020年11月12日周四 下午9:23写道:

> 执行checkpoint失败,报下面的错。
> 2020-11-12 21:04:56
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>


Re: flink-1.11.2 执行checkpoint失败

2020-11-12 文章 赵一旦
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?

史 正超  于2020年11月12日周四 下午9:23写道:

> 执行checkpoint失败,报下面的错。
> 2020-11-12 21:04:56
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>


??????????: ??????flink-1.11.2 ????checkpoint????

2020-11-12 文章 ?n?e?m?a ????????
??1.11.2??debug??
 ??




----
??: 
   "user-zh"



回复: 回复:flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
sql简化后类似这样, 做checkpoint超时


CREATE VIEW fcbox_send_fat_view AS
SELECT
  REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate,
  disCode,
  IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
  dislv,
  IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS 
expressCompanyId,
  IF(express_company_name IS NOT NULL, express_company_name, 'NA') AS 
expressCompanyName,
  IF(throw_area_name IS NOT NULL, throw_area_name, 'NA') AS throwAreaName,
  IF(channel_source IS NOT NULL, channel_source, -99) channelSource,
  IF(pay_type IS NOT NULL, pay_type, -99) AS payType,
  direction,
  IF(box_type IS NOT NULL, box_type, -99) AS boxType,
  FIRST_VALUE(0) AS send_freight_collect_revenue,
  count(xxx)
FROM (
  SELECT
t3.*,
dis_name
  FROM (
SELECT
  t2.*,
  disCode,
  dislv
FROM (
  SELECT
t1.*,
dis_code,
throw_area_name
  FROM (
SELECT
  s.*,
  CASE
WHEN sender_province_id = receiver_province_id AND sender_city_id = 
receiver_city_id THEN 1
WHEN sender_province_id = receiver_province_id AND sender_city_id 
<> sender_city_id THEN 2
WHEN sender_province_id <> receiver_province_id THEN 3
ELSE -99
  END AS direction,
  i.transport_time,
  i.send_type,
  o.pay_type
FROM fcbox_send_binlog s
LEFT JOIN fcbox_send_item_binlog i ON s.send_id = i.send_id
LEFT JOIN fcbox_order_binlog o ON s.send_id = o.send_id
WHERE s.send_from = 1 AND s.send_status > 100 AND s.client_post_time IS 
NOT NULL AND TimeOutOfRange(TO_TIMESTAMP(s.client_post_time)) <= 1
  ) t1 JOIN edbasic.basic_edinfo_16 FOR SYSTEM_TIME AS OF t1.proctime d1 ON 
t1.point_code = d1.ed_code
) t2, LATERAL TABLE(RecursionDiscode(dis_code)) AS T(disCode, dislv)
  ) t3 JOIN edbasic.basic_district_16 FOR SYSTEM_TIME AS OF t3.proctime d2 ON 
t3.disCode = d2.dis_code
)
GROUP BY
  REPLACE(SUBSTR(client_post_time, 1, 10), '-', ''),
  disCode,
  IF(dis_name IS NOT NULL, dis_name, 'NA'),
  dislv,
  IF(express_company_id IS NOT NULL, express_company_id, 'NA'),
  IF(express_company_name IS NOT NULL, express_company_name, 'NA'),
  IF(throw_area_name IS NOT NULL, throw_area_name, 'NA'),
  IF(channel_source IS NOT NULL, channel_source, -99),
  IF(pay_type IS NOT NULL, pay_type, -99),
  direction,
  IF(box_type IS NOT NULL, box_type, -99)
UNION ALL
SELECT
  REPLACE(SUBSTR(user_create_time, 1, 10), '-', '') AS staDate,
  disCode,
  IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
  dislv,
  IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS 
expressCompanyId,
  IF(express_company_name IS NOT NULL, express_company_name, 'NA') AS 
expressCompanyName,
  IF(throw_area_name IS NOT NULL, throw_area_name, 'NA') AS throwAreaName,
  IF(channel_source IS NOT NULL, channel_source, -99) channelSource,
  IF(pay_type IS NOT NULL, pay_type, -99) AS payType,
  direction,
  IF(box_type IS NOT NULL, box_type, -99) AS boxType,
  count(xxx)
FROM (
  SELECT
t3.*,
dis_name
  FROM (
  SELECT
t2.*,
disCode,
dislv
  FROM (
SELECT
  t1.*,
  dis_code,
  'NA' AS throw_area_name
FROM (
  SELECT
s.*,
CASE
  WHEN sender_province_id = receiver_province_id AND sender_city_id 
= receiver_city_id THEN 1
  WHEN sender_province_id = receiver_province_id AND sender_city_id 
<> sender_city_id THEN 2
  WHEN sender_province_id <> receiver_province_id THEN 3
  ELSE -99
END AS direction,
i.transport_time,
i.send_type,
o.pay_type
  FROM fcbox_send_binlog s
  LEFT JOIN fcbox_send_item_binlog i ON s.send_id = i.send_id
  LEFT JOIN fcbox_order_binlog o ON s.send_id = o.send_id
  WHERE s.send_from = 1 AND s.send_status >= 100 AND (s.change_flag <> 
3 or s.change_flag is null) AND s.user_create_time IS NOT NULL AND 
TimeOutOfRange(TO_TIMESTAMP(s.user_create_time)) <= 1
) t1 JOIN edbasic.basic_district_name_16 FOR SYSTEM_TIME AS OF 
t1.proctime d1 ON t1.sender_city_name = d1.dis_name
  ) t2, LATERAL TABLE(RecursionDiscode(dis_code)) AS T(disCode, dislv)
  ) t3 JOIN edbasic.basic_district_16 FOR SYSTEM_TIME AS OF t3.proctime d2 ON 
t3.disCode = d2.dis_code
)
GROUP BY
  REPLACE(SUBSTR(user_create_time, 1, 10), '-', ''),
  disCode,
  IF(dis_name IS NOT NULL, dis_name, 'NA'),
  dislv,
  IF(express_company_id IS NOT NULL, express_company_id, 'NA'),
  IF(express_company_name IS NOT NULL, express_company_name, 'NA'),
  IF(throw_area_name IS NOT NULL, throw_area_name, 'NA'),
  IF(channel_source IS NOT NULL, channel_source, -99),
  IF(pay_type IS NOT NULL, pay_type, -99),
  direction,
  IF(box_type IS NOT NULL, box_type, -99)
UNION ALL
SELECT
  REPLACE(SUBSTR(closeTime, 1, 10), '-', '') AS staDate,
  disCode,
  IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
  

回复: 回复:flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
没有,用的是jdbc sink,先是 三张change log的 left join,然后 
再分别与两张mysql维表的join,执行checkpoint超时。sql太复杂了,我越写越没信心。。。
```

CREATE VIEW fcbox_send_fat_view AS
SELECT
  REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate,
  disCode,
  IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
  dislv,
  IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS 
expressCompanyId,
  IF(express_company_name IS NOT NULL, express_company_name, 'NA') AS 
expressCompanyName,
  IF(throw_area_name IS NOT NULL, throw_area_name, 'NA') AS throwAreaName,
  IF(channel_source IS NOT NULL, channel_source, -99) channelSource,
  IF(pay_type IS NOT NULL, pay_type, -99) AS payType,
  direction,
  IF(box_type IS NOT NULL, box_type, -99) AS boxType,
  FIRST_VALUE(0) AS send_freight_collect_revenue,
  SUM(CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) THEN SendRevenue(express_company_id, COALESCE(freight, 0)) 
ELSE CAST(0 AS BIGINT) END) AS send_donation_revenue,
  SUM(CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) THEN total_amount ELSE CAST(0 AS BIGINT) END) AS 
send_amount,
  FIRST_VALUE(0) AS send_order_count,
  COUNT(CASE WHEN send_status IN (103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) THEN send_id END) AS in_transit_send_count,
  COUNT(send_id) AS drop_send_count,
  COUNT(CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) THEN send_id END) AS effect_drop_send_count,
  COUNT(CASE WHEN send_status in (105,106,109) AND (change_flag <> 3 OR 
change_flag IS NULL) THEN send_id END) AS drop_cancel_send_count,
  COUNT(DISTINCT CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag 
<> 3 OR change_flag IS NULL) THEN point_code END) AS drop_send_ed_count,
  COUNT(DISTINCT CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag 
<> 3 OR change_flag IS NULL) THEN user_id END) AS drop_send_psn_count,
  COUNT(CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND send_type = 2 THEN send_id END) AS 
wechat_scan_drop_send_count,
  COUNT(CASE WHEN send_status IN (101, 102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND send_type = 3 THEN send_id END) AS 
alipay_scan_drop_send_count,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
(UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 >= 0 
AND (UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 
< 2 THEN send_id END) AS courier_get_count_02,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
(UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 >= 2 
AND (UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 
< 6 THEN send_id END) AS courier_get_count_02_06,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
(UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 >= 6 
AND (UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 
< 12 THEN send_id END) AS courier_get_count_06_12,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
(UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 >= 
12 AND (UNIX_TIMESTAMP(courier_get_time) - 
UNIX_TIMESTAMP(client_post_time))/3600 < 24 THEN send_id END) AS 
courier_get_count_12_24,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
(UNIX_TIMESTAMP(courier_get_time) - UNIX_TIMESTAMP(client_post_time))/3600 >= 
24 THEN send_id END) AS courier_get_count_24,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
SUBSTR(client_post_time, 1, 10) = SUBSTR(courier_get_time, 1, 10) THEN send_id 
END) AS courier_get_count_day,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
TimeDayDiff(client_post_time, courier_get_time) = 1 AND 
CAST(SUBSTR(courier_get_time, 12, 2) AS INT) <= 12 THEN send_id END) AS 
courier_get_count_morrow_12,
  COUNT(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
((TimeDayDiff(client_post_time, courier_get_time) > 1) OR 
(TimeDayDiff(client_post_time, courier_get_time) = 1 AND 
CAST(SUBSTR(courier_get_time, 12, 2) AS INT) > 12)) THEN send_id END) AS 
courier_get_count_morrow_gt_12,
  SUM(CASE WHEN send_status IN (102, 103, 104) AND (change_flag <> 3 OR 
change_flag IS NULL) AND courier_get_time IS NOT NULL AND 
SUBSTR(client_post_time, 1, 10) = 

回复:flink-1.11.2 执行checkpoint失败

2020-11-12 文章 601049502
首先,我想知道你是否使用了kafka sink?


在 2020年11月12日 21:16,史 正超 写道:


执行checkpoint失败,报下面的错。 2020-11-12 21:04:56 
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

Re:Re: flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 hailongwang


已经建了支持  first_value 和 last_value 的 merge 方法 issue[1]。
同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。
@李世钰  感兴趣的话可以认领下哈。


[1] https://issues.apache.org/jira/browse/FLINK-20110
[2] https://issues.apache.org/jira/browse/FLINK-20111



At 2020-11-12 19:51:29, "Jark Wu"  wrote:
>可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。
>
>On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote:
>
>> Hi me,
>>
>>
>> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
>> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value
>> 不支持 merge。
>>
>>
>> Best,
>> hailong
>>
>> 在 2020-11-12 17:07:58,"李世钰"  写道:
>> >1. FLINK版本 flink1.11
>> >
>> >
>> >
>> >
>> >2. 使用的是useBlinkPlanner
>> >
>> >
>> >
>> >
>> >3.执行sql
>> >
>> >SELECT FIRST_VALUE(kafka_table.src_ip) AS
>> kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
>> kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
>> SECOND(2), INTERVAL '2' MINUTE(1)) AS
>> __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2),
>> INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',')
>> AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip =
>> '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30'
>> SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip
>> >
>> >
>> >
>> >
>> >4.报错信息
>> >
>> >Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Function class
>> 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
>> does not implement at least one method named 'merge' which is public, not
>> abstract and (in case of table functions) not static.
>> >
>> >   at
>> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
>> >
>> >   at
>> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
>> >
>> >   at
>> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
>> >
>> >   at
>> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
>> >
>> >   at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>> >
>> >   at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>> >
>> >   at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >
>> >   at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> >
>> >   at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
>> >
>> >   at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
>> >
>> >   at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> >
>> >   at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
>> >
>> > 

flink-1.11.2 执行checkpoint失败

2020-11-12 文章 史 正超
执行checkpoint失败,报下面的错。
2020-11-12 21:04:56
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 Lei Wang
用 session windown 确实能满足功能:

robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
y) -> y);

按照这种写法, 我理解 window state 中只保存了最近的一条记录。


正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。



On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang  于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00 msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>


Re: ElasticsearchApiCallBridge相关类构造函数问题

2020-11-12 文章 zhisheng
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码

Luna Wong  于2020年11月11日周三 下午9:16写道:

> 为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
> 我还想继承Elasticsearch6ApiCallBridge类。在new
> RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。
>
> 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?
>


Re: flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 Jark Wu
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。

On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote:

> Hi me,
>
>
> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value
> 不支持 merge。
>
>
> Best,
> hailong
>
> 在 2020-11-12 17:07:58,"李世钰"  写道:
> >1. FLINK版本 flink1.11
> >
> >
> >
> >
> >2. 使用的是useBlinkPlanner
> >
> >
> >
> >
> >3.执行sql
> >
> >SELECT FIRST_VALUE(kafka_table.src_ip) AS
> kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
> kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
> SECOND(2), INTERVAL '2' MINUTE(1)) AS
> __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2),
> INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',')
> AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip =
> '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30'
> SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip
> >
> >
> >
> >
> >4.报错信息
> >
> >Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Function class
> 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
> does not implement at least one method named 'merge' which is public, not
> abstract and (in case of table functions) not static.
> >
> >   at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
> >
> >   at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
> >
> >   at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
> >
> >   at
> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
> >
> >   at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
> >
> >   at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
> >
> >   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >
> >   at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> >
> >   at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
> >
> >   at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
> >
> >   at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> >
> >   

Re:flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 hailongwang
Hi me,


HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 
UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。


Best,
hailong

在 2020-11-12 17:07:58,"李世钰"  写道:
>1. FLINK版本 flink1.11
>
>
>
>
>2. 使用的是useBlinkPlanner
>
>
>
>
>3.执行sql
>
>SELECT FIRST_VALUE(kafka_table.src_ip) AS 
>kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS 
>kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' 
>SECOND(2), INTERVAL '2' MINUTE(1)) AS 
>__window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), 
>INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS 
>__origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' 
>GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' 
>MINUTE(1)),kafka_table.src_ip
>
>
>
>
>4.报错信息
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Function class 
>'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
> does not implement at least one method named 'merge' which is public, not 
>abstract and (in case of table functions) not static.
>
>   at 
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
>
>   at 
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
>
>   at 
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
>
>   at 
> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
>
>   at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>
>   at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>
>   at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
>
>   at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
>
>   at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>
>   at 

Re: Re: slot数量与并行度的大小关系

2020-11-12 文章 zhisheng
可以参考 http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/  文章理解一下

hl9...@126.com  于2020年11月12日周四 下午4:47写道:

> 是flink standalone 集群。
> job并行度是在job的java代码中通过  streamExecutionEnvironment.setParallelism(15) 来指定的。
>
>
>
> hl9...@126.com
>
> 发件人: Xintong Song
> 发送时间: 2020-11-12 13:18
> 收件人: user-zh
> 主题: Re: slot数量与并行度的大小关系
> 你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?
>
> 流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]
> 中的几种方式更改作业的并行度。另外,也可以通过配置 `taskmanager.numberOfTaskSlots` 来增加 flink 集群的
> slot 数量。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Nov 11, 2020 at 7:54 PM Shawn Huang  wrote:
>
> > Hi,
> >
> > Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。
> > 如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。
> >
> > 目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/concepts/flink-architecture.html#task-slots-and-resources
> > [3]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html
> >
> > Best,
> > Shawn Huang
> >
> >
> > hl9...@126.com  于2020年11月11日周三 下午2:58写道:
> >
> > > Hi,all:
> > > 我在flink
> > > web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
> > > Caused by:
> > >
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > > Could not allocate the required slot within slot request timeout.
> > > Please make sure that the cluster has enough resources.
> > >
> > > 是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?
> > >
> > >
> > >
> > > hl9...@126.com
> > >
> >
>


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 zhisheng
同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态

hdxg1101300123  于2020年11月12日周四 下午8:07写道:

> 可以设置检查点失败任务也失败
>
>
>
> 发自vivo智能手机
> > hi everyone,
> >
> > 最近在使用Flink-1.11.1 On Yarn Per
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> application仍处于运行状态
> >
> > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> >
> > best,
> > amenhub


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 zhisheng
hi

可以看看 Timer 的机制,能不能解决你的问题

Best zhisheng

hailongwang <18868816...@163.com> 于2020年11月12日周四 下午5:25写道:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang  于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00 msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>


回复: Flink与Yarn的状态一致性问题

2020-11-12 文章 hdxg1101300123
可以设置检查点失败任务也失败



发自vivo智能手机
> hi everyone,
>
> 最近在使用Flink-1.11.1 On Yarn Per 
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn 
> application仍处于运行状态
>
> 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
>
> best,
> amenhub

Re: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-11-12 文章 amen...@163.com
hi,

我现在的版本是flink-1.11.1没有加-d参数,也遇见了同样的问题,不知道是什么情况呢?

best,
amenhub



 
发件人: Yang Wang
发送时间: 2020-08-05 10:28
收件人: user-zh
主题: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running
你的Flink任务应该是用attach的方式起的,也就是没有加-d,这种情况在1.10之前起的任务本质上是一个session,
只有当结果被client端retrieve走以后,才会退出,如果client挂了或者你主动停掉了,那就会留下一个空的session
 
你可以通过如下log确认起的session模式
 
2020-08-04 10:45:36,868 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Starting YarnSessionClusterEntrypoint (Version: 1.9.1, Rev:f23f82a,
Date:01.11.2019 @ 11:20:33 CST)
 
 
你可以flink run -d ...就是perjob模式了,或者升级到1.10及以后版本attach/detach都是真正的perjob
 
 
Best,
Yang
 
bradyMk  于2020年8月4日周二 下午8:04写道:
 
> 您好:
> 请问这是flink这个版本自身的bug么?那就意味着没有办法解决了吧,只能手动kill掉?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 李世钰
1. FLINK版本 flink1.11




2. 使用的是useBlinkPlanner




3.执行sql

SELECT FIRST_VALUE(kafka_table.src_ip) AS 
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS 
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' 
SECOND(2), INTERVAL '2' MINUTE(1)) AS 
__window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), 
INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS 
__origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' 
GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' 
MINUTE(1)),kafka_table.src_ip




4.报错信息

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Function class 
'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
 does not implement at least one method named 'merge' which is public, not 
abstract and (in case of table functions) not static.

at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)

at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)

at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)

at 
org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)


flink sql 函数FIRST_VALUE调用报错

2020-11-12 文章 me
1. FLINK版本 flink1.11


2. 使用的是useBlinkPlanner


3.执行sql
SELECT FIRST_VALUE(kafka_table.src_ip) AS 
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS 
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' 
SECOND(2), INTERVAL '2' MINUTE(1)) AS 
__window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), 
INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS 
__origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' 
GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' 
MINUTE(1)),kafka_table.src_ip


4.报错信息
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Function class 
'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
 does not implement at least one method named 'merge' which is public, not 
abstract and (in case of table functions) not static.
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
at 
org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 

Re: 关于filesystem connector的一点疑问

2020-11-12 文章 Jingsong Li
尽早的可查,直接把delay设为0即可 (其它默认值)

On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote:

> Hi,jingsong
> 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
> 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
> 比如sink.partition-commit.trigger = partition-time
> sink.partition-commit.delay = 10 min
>
> > 2020年11月12日 下午3:22,Jingsong Li  写道:
> >
> > Hi admin,
> >
> > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> >
> > On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> >
> >> 补充一下不用partition time trigger的原因,partition
> >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
> >>
> >>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >>>
> >>> Hi ,kandy
> >>> 我没有基于partition time 提交分区,我是基于默认的process
> >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >>>
>  2020年11月12日 下午12:46,kandy.wang  写道:
> 
>  hi:
>  按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
>  设置的多久,如果超过之后,应当默认是会丢弃的吧。
> 
> 
>  https://cloud.tencent.com/developer/article/1707182
> 
>  这个连接可以看一下
> 
> 
> 
> 
> 
> 
> 
>  在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> > Hi,all
> > Flink 1.11的filesystem connector,partition
> trigger[1]都是使用的默认值,所以分区可以多次提交
> > 现在有这样的场景:
> > 消费kafka数据写入hdfs中,分区字段是 day + hour
> >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> > 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> > 有大佬知道吗,有实际验证过吗
> > 感谢
> >
> > 附上简单sql:
> > CREATE TABLE kafka (
> > a STRING,
> > b STRING,
> > c BIGINT,
> > process_time BIGINT,
> > e STRING,
> > f STRING,
> > g STRING,
> > h INT,
> > i STRING
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'topic',
> > 'properties.bootstrap.servers' = 'x',
> > 'properties.group.id' = 'test-1',
> > 'scan.startup.mode' = 'latest-offset',
> > 'format' = 'json',
> > 'properties.flink.partition-discovery.interval-millis' = '30'
> > );
> >
> > CREATE TABLE filesystem (
> > `day` STRING,
> > `hour` STRING,
> > a STRING,
> > b STRING,
> > c BIGINT,
> > d BIGINT,
> > e STRING,
> > f STRING,
> > g STRING,
> > h INT,
> > i STRING
> > ) PARTITIONED BY (`day`, `hour`) WITH (
> > 'connector' = 'filesystem',
> > 'format' = 'parquet',
> > 'path' = 'hdfs://xx',
> > 'parquet.compression'='SNAPPY',
> > 'sink.partition-commit.policy.kind' = 'success-file'
> > );
> >
> > insert into filesystem
> > select
> > from_unixtime(process_time,'-MM-dd') as `day`,
> > from_unixtime(process_time,'HH') as `hour`,
> > a,
> > b,
> > c,
> > d,
> > e,
> > f,
> > g,
> > h,
> > i
> > from kafka;
> >
> >
> >
> > [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >>>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re:Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-12 文章 hailongwang


这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是:
1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“
2、一直没有 join 上 mysql 的数据导致的。
可以设置下 数据库的 wait_timeout 看下
PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。
最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致 
这条数据被自动ack而丢弃的。
如果开启 checkpoint 的话,下游支持 upsert 或者 精确一次语意的话,就会保证全链路精确一次,要不然会最少一次,就是会重复

At 2020-11-12 13:36:11, "xiexinyuan341"  wrote:
>souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次.
>2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC
>executeBatch error, retry times = 1
>com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
>failure
>
>The last packet successfully received from the server was 815,816
>milliseconds ago.  The last packet sent successfully to the server was 1
>milliseconds ago.
>   at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown 
> Source)
>   at
>sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
>   at
>com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
>   at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
>   at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
>   at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
>   at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
>   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
>   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
>   at
>com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
>   at
>com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
>   at
>org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
>   at LookupFunction$10.flatMap(Unknown Source)
>   at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
>   at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
>   at
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>   at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at StreamExecCalc$7.processElement(Unknown Source)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>   at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at
>org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>   at
>org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:75)
>   at JoinTableFuncCollector$6.collect(Unknown Source)
>   at
>org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203)
>   at
>org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162)
>   at LookupFunction$2.flatMap(Unknown Source)
>   at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
>   at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
>   at
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>   at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at

Re:Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 hailongwang



这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。




在 2020-11-12 14:34:59,"Danny Chan"  写道:
>感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
>
>Lei Wang  于2020年11月11日周三 下午2:03写道:
>
>> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>>
>> 比如
>> robot1   2020-11-11 12:00:00 msginfo
>> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>>
>> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>>
>> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>>
>> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
>> 我们必须 按 robotId 做 keyBy
>>
>> 求大神指教。
>>
>> 谢谢,
>> 王磊
>>


Re: 关于filesystem connector的一点疑问

2020-11-12 文章 admin
Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
比如sink.partition-commit.trigger = partition-time
sink.partition-commit.delay = 10 min

> 2020年11月12日 下午3:22,Jingsong Li  写道:
> 
> Hi admin,
> 
> 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> 
> On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> 
>> 补充一下不用partition time trigger的原因,partition
>> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>> 
>>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
>>> 
>>> Hi ,kandy
>>> 我没有基于partition time 提交分区,我是基于默认的process
>> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
>>> 
 2020年11月12日 下午12:46,kandy.wang  写道:
 
 hi:
 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
>> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
 设置的多久,如果超过之后,应当默认是会丢弃的吧。
 
 
 https://cloud.tencent.com/developer/article/1707182
 
 这个连接可以看一下
 
 
 
 
 
 
 
 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> Hi,all
> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
> 现在有这样的场景:
> 消费kafka数据写入hdfs中,分区字段是 day + hour
>> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> 有大佬知道吗,有实际验证过吗
> 感谢
> 
> 附上简单sql:
> CREATE TABLE kafka (
> a STRING,
> b STRING,
> c BIGINT,
> process_time BIGINT,
> e STRING,
> f STRING,
> g STRING,
> h INT,
> i STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'topic',
> 'properties.bootstrap.servers' = 'x',
> 'properties.group.id' = 'test-1',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json',
> 'properties.flink.partition-discovery.interval-millis' = '30'
> );
> 
> CREATE TABLE filesystem (
> `day` STRING,
> `hour` STRING,
> a STRING,
> b STRING,
> c BIGINT,
> d BIGINT,
> e STRING,
> f STRING,
> g STRING,
> h INT,
> i STRING
> ) PARTITIONED BY (`day`, `hour`) WITH (
> 'connector' = 'filesystem',
> 'format' = 'parquet',
> 'path' = 'hdfs://xx',
> 'parquet.compression'='SNAPPY',
> 'sink.partition-commit.policy.kind' = 'success-file'
> );
> 
> insert into filesystem
> select
> from_unixtime(process_time,'-MM-dd') as `day`,
> from_unixtime(process_time,'HH') as `hour`,
> a,
> b,
> c,
> d,
> e,
> f,
> g,
> h,
> i
> from kafka;
> 
> 
> 
> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
>>> 
>> 
>> 
> 
> -- 
> Best, Jingsong Lee



Re: Re: slot数量与并行度的大小关系

2020-11-12 文章 hl9...@126.com
是flink standalone 集群。
job并行度是在job的java代码中通过  streamExecutionEnvironment.setParallelism(15) 来指定的。



hl9...@126.com
 
发件人: Xintong Song
发送时间: 2020-11-12 13:18
收件人: user-zh
主题: Re: slot数量与并行度的大小关系
你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?
 
流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]
中的几种方式更改作业的并行度。另外,也可以通过配置 `taskmanager.numberOfTaskSlots` 来增加 flink 集群的
slot 数量。
 
Thank you~
 
Xintong Song
 
 
 
On Wed, Nov 11, 2020 at 7:54 PM Shawn Huang  wrote:
 
> Hi,
>
> Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。
> 如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。
>
> 目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/concepts/flink-architecture.html#task-slots-and-resources
> [3]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html
>
> Best,
> Shawn Huang
>
>
> hl9...@126.com  于2020年11月11日周三 下午2:58写道:
>
> > Hi,all:
> > 我在flink
> > web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
> > Caused by:
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Could not allocate the required slot within slot request timeout.
> > Please make sure that the cluster has enough resources.
> >
> > 是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?
> >
> >
> >
> > hl9...@126.com
> >
>