Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-12 Thread zhisheng
hello, Yang Wang 这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗? Best, zhisheng Yang Wang 于2020年9月3日周四 上午11:15写道: > 目前HA模式下,application模式还不能支持多job,所以就固定是0了 > 主要的原因是recover的问题还没有解决好 > > > Best, > Yang > > chenkaibit 于2020年9月2日周三 下午7:29写道: > > > hi: > > 我在测试 flink-1.11 application 模式时发现 开启 HA 后

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread JasonLee
hi 把 -d 参加加上用分离方式启动 应该就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink AutoScaling EMR

2020-11-12 Thread Robert Metzger
Hi, it seems that YARN has a feature for targeting specific hardware: https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html In any case, you'll need enough spare resources for some time to be able to run your job twice for this kind of "zero downtime

Re:回复: flink-1.11.2 执行checkpoint失败

2020-11-12 Thread hailongwang
据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。 我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启) 如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。 在 2020-11-13 09:13:34,"史 正超" 写道: >这是个思路,谢谢回复,我先试下。 > >发件人:

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件 newxmmxszc44.qq.com rejected your message to the following email addresses: m...@zhangzuofeng.cn Your message couldn't be delivered because the recipient's email system wasn't able to confirm that your message came from a trusted location. For Email

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件 newxmmxszc44.qq.com rejected your message to the following email addresses: m...@zhangzuofeng.cn Your message couldn't be delivered because the recipient's email system wasn't able to confirm that your message came from a trusted location. For Email

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread JasonLee
hi 从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached 的值是什么吗? 再有启动任务的时候是否加了 -d 参数 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-12 Thread jindy_liu
源表test: CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task',

Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Thank you.. I looked into that, but that does not initialize any values in keyed state, instead, it using key state, and lines 407-412 show that is not setting key state values in advanced, handling null values when it is not set in advance. public void processElement(String value, Context ctx,

Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Guowei Ma
Hi, Macro I think you could look at testScalingUp() at flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java Best, Guowei On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos wrote: > Hi, > > I would

Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread tison
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的 execute 应该也是会自动的去拉结果的。 可以看下下列关键日志的打印情况 - log.info("Job {} reached globally terminal state {}.", ...) - LOG.debug("Shutting down cluster because someone retrieved the job result."); - LOG.info("Shutting {} down with

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
hi 1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job Manager -> Configuration观察到execution.target值为yarn-per-job 2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job Manager -> logs查看失败日志内容) best, amenhub 发件人: JasonLee 发送时间: 2020-11-13

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
hi 1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job Manager -> Configuration观察到execution.target值为yarn-per-job 2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job Manager -> logs查看失败日志内容) best, amenhub 发件人: JasonLee 发送时间: 2020-11-13

Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread JasonLee
hi 1,首先确定你提交的是per-job模式吗? 2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread kingdomad
这个问题我也遇到过。 1.11版本,提交任务如果没加-d参数,flink程序挂掉了,但是yarn的application还一直是running状态,就相当于一个常驻的yarn session。 加上-d的话才能把flink程序和yarn application的生命周期捆绑到一起。 -- kingdomad 在 2020-11-13 11:16:02,"amen...@163.com" 写道: 当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
>>>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。 按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running.. >>>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢? 这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1 昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web

Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread tison
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。 当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。 你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢? Best, tison. zhisheng 于2020年11月12日周四 下午8:17写道: > 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态 > > hdxg1101300123

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 Thread Danny Chan
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态 Lei Wang 于2020年11月12日周四 下午9:17写道: > 用 session windown 确实能满足功能: > > > robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, > y) -> y); > > 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 > > > 正常情况下 robot

Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Hi, I would like to adding keyed state to test harness before calling process function. I am using the OneInputStreamOperatorTestHarness. I can't find any examples online on how to do that, and I am struggling to figure this out. Can somebody please provide guidance? My test case has keyed

回复: flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 史 正超
这是个思路,谢谢回复,我先试下。 发件人: 赵一旦 发送时间: 2020年11月13日 2:05 收件人: user-zh@flink.apache.org 主题: Re: flink-1.11.2 执行checkpoint失败 如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。 有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。 史 正超 于2020年11月13日周五

Re: flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 赵一旦
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。 有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。 史 正超 于2020年11月13日周五 上午10:01写道: > 从上面看是的。 > > public void handleJobLevelCheckpointException(CheckpointException > exception, long checkpointId) { >

回复: flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 史 正超
从上面看是的。 public void handleJobLevelCheckpointException(CheckpointException exception, long checkpointId) { checkFailureCounter(exception, checkpointId); if (continuousFailureCounter.get() > tolerableCpFailureNumber) { clearCount(); failureCallback.failJob(new

Re: flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 赵一旦
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢? 史 正超 于2020年11月12日周四 下午9:23写道: > 执行checkpoint失败,报下面的错。 > 2020-11-12 21:04:56 > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable >

??????????: ??????flink-1.11.2 ????checkpoint????

2020-11-12 Thread ?n?e?m?a ????????
??1.11.2??debug?? ?? ---- ??:

回复: 回复:flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 史 正超
sql简化后类似这样, 做checkpoint超时 CREATE VIEW fcbox_send_fat_view AS SELECT REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate, disCode, IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName, dislv, IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS expressCompanyId,

回复: 回复:flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 史 正超
没有,用的是jdbc sink,先是 三张change log的 left join,然后 再分别与两张mysql维表的join,执行checkpoint超时。sql太复杂了,我越写越没信心。。。 ``` CREATE VIEW fcbox_send_fat_view AS SELECT REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate, disCode, IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName, dislv,

回复:flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 601049502
首先,我想知道你是否使用了kafka sink? 在 2020年11月12日 21:16,史 正超 写道: 执行checkpoint失败,报下面的错。 2020-11-12 21:04:56 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at

Re: Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
I have some updates. Some weird behaviours were found. Please refer to the attached photo. All requests were sent via REST API The status of the savepoint triggered by that stop request (ID 11018) is "COMPLETED [Savepoint]", however, no checkpoint data has been persisted (in S3). The folder

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-12 Thread fuyao . li
The test workflow attachment is not added in the previous email, sorry for the confusion, please refer to the describe text workflow.. Thanks. On 11/12/20 16:17, fuyao...@oracle.com wrote: Hi All, Just to add a little more context to the problem. I have a full outer join operation before

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-12 Thread fuyao . li
Hi All, Just to add a little more context to the problem. I have a full outer join operation before this stage. The source data stream for full outer join is a Kafka Source. I also added timestamp and watermarks to the FlinkKafkaConsumer. After that, it makes no difference to the result,

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I'm now trying with a MATCH_RECOGNIZE: SELECT * FROM customers MATCH_RECOGNIZE ( PARTITION BY client_number ORDER BY proctime() MEASURES LAST(B.client_number) as client_number, LAST(B.address) as address PATTERN (A* B) DEFINE B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) )

Re: Logs of JobExecutionListener

2020-11-12 Thread Flavio Pompermaier
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient.. Am I doing something wrong? My main class ends

Re: DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Yuval Itzchakov
Hi Aljoscha, You're right, I had a misunderstanding about how unions without window operations work. Thanks! On Thu, Nov 12, 2020, 18:37 Aljoscha Krettek wrote: > Hi, > > I think if you don't do any operations that are sensitive to event-time > then just using a UNION/UNION ALL should work

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Simone Cavallarin
Hi Aljoscha, Yes correct i would like to have more windows when there are more events for a given time frame. That is when the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically

Re: Flink AutoScaling EMR

2020-11-12 Thread Rex Fenley
Awesome, thanks! Is there a way to make the new yarn job only on the new hardware? Or would the two jobs have to run on intersecting hardware and then would be switched on/off, which means we'll need a buffer of resources for our orchestration? Also, good point on recovery. I'll spend some time

Re: DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Aljoscha Krettek
Hi, I think if you don't do any operations that are sensitive to event-time then just using a UNION/UNION ALL should work because then there won't be any buffering by event time which could delay your output. Have you tried this and have you seen an actual delay in your output? Best,

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Aljoscha Krettek
Hi, I'm not sure that what you want is possible. You say you want more windows when there are more events for a given time frame? That is when the events are more dense in time? Also, using the event timestamp as the gap doesn't look correct. The gap basically specifies the timeout for a

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid, Thank you so much for your detailed reply. I think I will go with one schema per topic using GenericRecordAvroTypeInfo for genericRecords and not do any custom magic. Approach of sending records as byte array also seems quite interesting. Right now I am deserializing avro records so

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I see what was my mistake: I was using a field in my ORDER BY, while it only support proctime() for now. That allows me to create an append only stream, thanks a lot! However, it still does not allow me to do what I need: *If I use both my primary key and changing column in PARTITION BY, then

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
You need to differentiate two serialization abstractions (which I guess you already know). One is coming from reading the source, where the DeserializationSchema is used, and it translates the bytes of Kafka into something that Flink can handle. The second serialization occurs within Flink

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
So in this case, flink will fall back to default kyro serialiser right ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Arvid Heise
Hi Tim, afaik we are confusing two things here, there is a transaction timeout = how long the transaction lasts until aborted. And what you see here is some timeout while creating the transaction in the first place. A quick google search turned up [1], from which I'd infer that you need to set

How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Simone Cavallarin
Hi All, I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Tim Josefsson
To further add to this problem, I've now got our ops team to set transaction.max.timeout.ms on our Kafka brokers to 1 hour (as suggested by the Flink docs). However the problem persists and I'm still getting the same error message. I've confirmed that this config setting is actually set on the

Data loss exception using hash join in batch mode

2020-11-12 Thread
Data loss exception using hash join in batch mode

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Tim Josefsson
Also realized I had a typo in the config dump I did in the previous email (the one from the 10th). If I don't do Properties producerProps = new Properties(); producerProps.setProperty("transaction.timeout.ms", "90"); Then the value reported from the ProducerConfig is 360 and not 6 as I

Re:Re: flink sql 函数FIRST_VALUE调用报错

2020-11-12 Thread hailongwang
已经建了支持 first_value 和 last_value 的 merge 方法 issue[1]。 同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。 @李世钰 感兴趣的话可以认领下哈。 [1] https://issues.apache.org/jira/browse/FLINK-20110 [2] https://issues.apache.org/jira/browse/FLINK-20111 At 2020-11-12 19:51:29, "Jark Wu" wrote:

flink-1.11.2 执行checkpoint失败

2020-11-12 Thread 史 正超
执行checkpoint失败,报下面的错。 2020-11-12 21:04:56 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) at

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 Thread Lei Wang
用 session windown 确实能满足功能: robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, y) -> y); 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 On Thu, Nov 12, 2020 at 5:25 PM hailongwang

Logs of JobExecutionListener

2020-11-12 Thread Flavio Pompermaier
Hello everybody, I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal? Best, Flavio

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
If you follow the best practices, then topics should never have different schemas as you can't enforce schema compatibility. You also have very limited processing capabilities and clumsy workflows attached to it. If you want to encode different kinds of events, then the common approach is to use

Re: ElasticsearchApiCallBridge相关类构造函数问题

2020-11-12 Thread zhisheng
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码 Luna Wong 于2020年11月11日周三 下午9:16写道: > 为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。 > 我还想继承Elasticsearch6ApiCallBridge类。在new > RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。 > > 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?

Re: flink sql 函数FIRST_VALUE调用报错

2020-11-12 Thread Jark Wu
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。 On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote: > Hi me, > > > HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane > 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value > 不支持 merge。 >

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid, Thanks a lot for your reply. And yes, we do use confluent schema registry extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects reader schema to be provided. That means it reads the message using writer schema and converts to reader schema. But this is not what I want

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Tim Josefsson
Sure, I've attached it to this email. The process seems to restart once the TimeoutException happens so it's repeated a couple of times. Thanks for looking at it! /Tim On Wed, 11 Nov 2020 at 10:37, Aljoscha Krettek wrote: > Hmm, could you please post the full stack trace that leads to the >

Re:flink sql 函数FIRST_VALUE调用报错

2020-11-12 Thread hailongwang
Hi me, HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。 Best, hailong 在 2020-11-12 17:07:58,"李世钰" 写道: >1. FLINK版本 flink1.11 > > > > >2. 使用的是useBlinkPlanner > > > > >3.执行sql > >SELECT

Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
Hi, I'm on 1.11.0, with a streaming job running on a YARN session, reading from Kinesis. I tried to stop the job using REST, with "drain=false". After that POST request, I got back a request_id (not sure how should I use that for). Checked the job in GUI, I could see that a savepoint has been

Re: Re: slot数量与并行度的大小关系

2020-11-12 Thread zhisheng
可以参考 http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ 文章理解一下 hl9...@126.com 于2020年11月12日周四 下午4:47写道: > 是flink standalone 集群。 > job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。 > > > > hl9...@126.com > > 发件人: Xintong Song > 发送时间: 2020-11-12 13:18 > 收件人:

Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread zhisheng
同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态 hdxg1101300123 于2020年11月12日周四 下午8:07写道: > 可以设置检查点失败任务也失败 > > > > 发自vivo智能手机 > > hi everyone, > > > > 最近在使用Flink-1.11.1 On Yarn Per > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn > application仍处于运行状态 > > > >

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 Thread zhisheng
hi 可以看看 Timer 的机制,能不能解决你的问题 Best zhisheng hailongwang <18868816...@163.com> 于2020年11月12日周四 下午5:25写道: > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > 在 2020-11-12 14:34:59,"Danny Chan" 写道: > >感觉你这个应该是一个 session

回复: Flink与Yarn的状态一致性问题

2020-11-12 Thread hdxg1101300123
可以设置检查点失败任务也失败 发自vivo智能手机 > hi everyone, > > 最近在使用Flink-1.11.1 On Yarn Per > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn > application仍处于运行状态 > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢 > > best, > amenhub

DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Yuval Itzchakov
Hi, I want to create an abstraction over N source tables (streams), and unify them into 1 table. I know UNION and UNION ALL exist, but I'm looking for DataStream.connect like semantics in regards to watermark control. I don't want to take the minimum watermark over all N streams, as I know for

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Felipe Gutierrez
I see. now it has different query plans. It was documented on another page so I got confused. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, Nov 12, 2020 at 12:41 PM Jark Wu wrote: > > Hi Felipe, > > The default value of

Re: PyFlink Table API and UDF Limitations

2020-11-12 Thread Dian Fu
Hi Niklas, Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc. [1]

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Jark Wu
Hi Laurent, 1. Currently, it's impossible to convert deduplicate with last row into an append-only stream. 2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively. However, regarding your case, I think you can use keep first row on client_number+address key. SELECT *

Re: PyFlink Table API and UDF Limitations

2020-11-12 Thread Niklas Wilcke
Hi Dian, thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue. Another question that came up to my mind is whether

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Jark Wu
Hi Felipe, The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled, if will use TWO-PHASE, otherwise ONE-PHASE. https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy On Thu, 12 Nov 2020 at 17:52,

Re: Password usage in ssl configuration

2020-11-12 Thread Nico Kruber
Hi Suchithra, I'm not sure you can actually pass passwords in any other way. I'm also not sure this is needed if these are job-/cluster-specific because then, an attacker would have to have access to that first in order to get these credentials. And if the attacker has access to the

Re: Long blocking call in UserFunction triggers HA leader lost?

2020-11-12 Thread Maxim Parkachov
Hi Theo, We had a very similar problem with one of our spark streaming jobs. Best solution was to create a custom source having all external records in cache, periodically reading external data and comparing it to cache. All changed records were then broadcasted to task managers. We tried to

Re: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-11-12 Thread amen...@163.com
hi, 我现在的版本是flink-1.11.1没有加-d参数,也遇见了同样的问题,不知道是什么情况呢? best, amenhub 发件人: Yang Wang 发送时间: 2020-08-05 10:28 收件人: user-zh 主题: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running 你的Flink任务应该是用attach的方式起的,也就是没有加-d,这种情况在1.10之前起的任务本质上是一个session,

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
The common solution is to use a schema registry, like Confluent schema registry [1]. All records have a small 5 byte prefix that identifies the schema and that gets fetched by deserializer [2]. Here are some resources on how to properly secure communication if needed [3]. [1]

Re: Flink 1.8.3 GC issues

2020-11-12 Thread Aljoscha Krettek
Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251 On 11.11.20 19:09, Aljoscha Krettek wrote: Hi, nice work on debugging this! We need the synchronized block in the source because the call to reader.advance() (via the invoker) and reader.getCurrent() (via

flink sql 函数FIRST_VALUE调用报错

2020-11-12 Thread 李世钰
1. FLINK版本 flink1.11 2. 使用的是useBlinkPlanner 3.执行sql SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS

flink sql 函数FIRST_VALUE调用报错

2020-11-12 Thread me
1. FLINK版本 flink1.11 2. 使用的是useBlinkPlanner 3.执行sql SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Felipe Gutierrez
Hi Jack, I don't get the difference from the "MiniBatch Aggregation" if compared with the "Local-Global Aggregation". On the web page [1] it says that I have to enable the TWO_PHASE parameter. So I compared the query plan from both, with and without the TWO_PHASE parameter. And they are the same.

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
Hi Jark, thanks again for your quick response! I tried multiple variants of my query by: - specifying only the primary key in the PARTITION BY clause - changing the order to DESC to keep the last row --> I unfortunately always get the same error message. If I try to make a simple select on the

Re: 关于filesystem connector的一点疑问

2020-11-12 Thread Jingsong Li
尽早的可查,直接把delay设为0即可 (其它默认值) On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote: > Hi,jingsong > 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 > 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, > 比如sink.partition-commit.trigger = partition-time > sink.partition-commit.delay = 10 min > > >

Re:Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-12 Thread hailongwang
这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是: 1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“ 2、一直没有 join 上 mysql 的数据导致的。 可以设置下 数据库的 wait_timeout 看下 PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。 最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致 这条数据被自动ack而丢弃的。 如果开启 checkpoint 的话,下游支持 upsert

Re:Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 Thread hailongwang
这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 在 2020-11-12 14:34:59,"Danny Chan" 写道: >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > >Lei Wang 于2020年11月11日周三 下午2:03写道: > >> 有很多边缘机器人设备(我们称为 robot)往

Re: 关于filesystem connector的一点疑问

2020-11-12 Thread admin
Hi,jingsong 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, 比如sink.partition-commit.trigger = partition-time sink.partition-commit.delay = 10 min > 2020年11月12日 下午3:22,Jingsong Li 写道: > > Hi admin, > > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) > > On Thu,

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi, Thanks a lot for the reply. And you both are right. Serializing GenericRecord without specifying schema was indeed a HUGE bottleneck in my app. I got to know it through jfr analysis and then read the blog post you mentioned. Now I am able to pump in lot more data per second. (In my test setup

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Robert Metzger
Hi, from my experience serialization contributes a lot to the maximum achievable throughput. I can strongly recommend checking out this blog post, which has a lot of details on the topic: https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html On Tue, Nov 10, 2020 at 9:46

Re: Re: slot数量与并行度的大小关系

2020-11-12 Thread hl9...@126.com
是flink standalone 集群。 job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。 hl9...@126.com 发件人: Xintong Song 发送时间: 2020-11-12 13:18 收件人: user-zh 主题: Re: slot数量与并行度的大小关系 你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的? 流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]

Re: Job crash in job cluster mode

2020-11-12 Thread Robert Metzger
Hey Tim, what Is your Flink job doing? Is it restarting from time to time? Is the JobManager crashing, or the TaskManager? On Tue, Nov 10, 2020 at 6:01 PM Matthias Pohl wrote: > Hi Tim, > I'm not aware of any memory-related issues being related to the deployment > mode used. Have you checked

Re: why not flink delete the checkpoint directory recursively?

2020-11-12 Thread Robert Metzger
Hey Josh, As far as I understand the code CompletedCheckpoint.discard(), Flink is removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then deleting the directory. Which files are left over in your case? Do you see any exceptions on the TaskManagers? Best, Robert On Wed, Nov

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-12 Thread Robert Metzger
Hi Jiahui, using the yarn.container-start-command-template is indeed a good idea. I was also wondering whether the Flink YARN client that submits the Flink cluster to YARN has knowledge of the host where the ApplicationMaster gets deployed to. But that doesn't seem to be the case. On Wed, Nov