Re: 关于flink检查点

2020-04-17 Thread coke half
你好,我现在了解到有对检查点开销的问题建模中考虑到恢复时间等开销,当负载大时检查点间隔会缩短。其实问题也就是,在大负载情况下自动缩短检查点间隔这个动作在实际业务场景中有意义吗?谢谢 发件人: Lee Sysuke 发送时间: Friday, April 17, 2020 10:41:42 AM 收件人: user-zh 主题: Re: 关于flink检查点 一点个人看法:

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
这个按照目前的设计,应该不能算是bug,应该是by desigh的。 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。 dixingxing85 于2020年4月18日 周六上午11:38写道: > 多谢benchao, > 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: > 20200417,86 > 20200417,90 > 20200417,130 > 20200417,131 > > 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: > 20200417,90 >

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
这个按照目前的设计,应该不能算是bug,应该是by desigh的。 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。 dixingxing85 于2020年4月18日 周六上午11:38写道: > 多谢benchao, > 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: > 20200417,86 > 20200417,90 > 20200417,130 > 20200417,131 > > 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: > 20200417,90 >

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang, Name filtering & schema special handling makes sense for me. We can enrich later if there is requirement without breaking interface. For #1, from my perspective your first proposal is having an option specifies remote flink/lib, then we turn off auto uploading local flink/lib and

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxing85
多谢benchao, 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: 20200417,86 20200417,90 20200417,130 20200417,131 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: 20200417,90 20200417,86 20200417,130 20200417,86 20200417,131 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxing85
多谢benchao, 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: 20200417,86 20200417,90 20200417,130 20200417,131 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: 20200417,90 20200417,86 20200417,130 20200417,86 20200417,131 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
Hi, 这个是支持的哈。 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new]. 如果是两层的话,就成了: 第一层-[old], 第二层-[cur], +[old] 第一层+[new], 第二层[-old], +[new] dixingxin...@163.com 于2020年4月18日周六 上午2:11写道: > > Hi all: > > 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug, > 或者flink还不支持这种sql*。 >

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
Hi, 这个是支持的哈。 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new]. 如果是两层的话,就成了: 第一层-[old], 第二层-[cur], +[old] 第一层+[new], 第二层[-old], +[new] dixingxin...@163.com 于2020年4月18日周六 上午2:11写道: > > Hi all: > > 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug, > 或者flink还不支持这种sql*。 >

Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-17 Thread Jacob Sevart
This sounds a lot like an issue I just went through ( http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Very-large-metadata-file-td33356.html). Are you using a union list state anywhere? You could also use the debugging steps mentioned in that thread to inspect the contents of

flink1.9,WEB页面,Job Manager页面样式问题。

2020-04-17 Thread guanyq
ok 找到原因了!不好意思! 在 2020-04-16 08:03:29,"guanyq" 写道: >代码里面是有env.execute,提交job出现以下错误,可能时什么原因? >The program didn't contain a Flink job. Perhaps you forgot to call execute() >on the execution environment.

Re: Akka Error

2020-04-17 Thread tison
If you run a program using "flink run" in dist/bin, dependencies should be taken care of. Could you describe detailedly how you "start a flink program"? Did you write an entrypoint, compile it and run by "java YouProgram"? If so, you should configure classpath by yourself. Best, tison.

Re: multi-sql checkpoint fail

2020-04-17 Thread tison
Hi, Could you share the stack traces? Best, tison. forideal 于2020年4月18日周六 上午12:33写道: > Hello friend > I have two SQL, checkpoint fails all the time. One task is to open a > sliding window for an hour, and then another task consumes the output data > of the previous task. There will be no

Re: Flink 1.10 Out of memory

2020-04-17 Thread Zahid Rahman
https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/ Backbutton.co.uk ¯\_(ツ)_/¯ ♡۶Java♡۶RMI ♡۶ Make Use Method {MUM} makeuse.org On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard < lassenedergaardfl...@gmail.com> wrote: > Hi. > > We have migrated

Akka Error

2020-04-17 Thread Alexander Borgschulze
When I try to start a flink program, I get the following exception:   com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version'         at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)         at

Re: Debug Slowness in Async Checkpointing

2020-04-17 Thread Robert Metzger
Hi, did you check the TaskManager logs if there are retries by the s3a file system during checkpointing? I'm not aware of any metrics in Flink that could be helpful in this situation. Best, Robert On Tue, Apr 14, 2020 at 12:02 AM Lu Niu wrote: > Hi, Flink users > > We notice sometimes async

Re: Checkpoint Space Usage Debugging

2020-04-17 Thread Yun Tang
Hi Kent You can view checkpoint details via web UI to know how much checkpointed data uploaded for each operator, and you can compare the state size as time goes on to see whether they upload checkpointed data in stable range. Best Yun Tang From: Kent Murra

Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com
Hi all: 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt, B -> pvareaid) SELECT dt, SUM(a.uv) AS uv FROM ( SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv FROM streaming_log_event

Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com
Hi all: 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt, B -> pvareaid) SELECT dt, SUM(a.uv) AS uv FROM ( SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv FROM streaming_log_event

Checkpoint Space Usage Debugging

2020-04-17 Thread Kent Murra
I'm looking into a situation where our checkpoint sizes are automatically growing over time. I'm unable to pinpoint exactly why this is happening, and it would be great if there was a way to figure out how much checkpoint space is attributable to each operator so I can narrow it down. Are there

Re: Flink upgrade to 1.10: function

2020-04-17 Thread seeksst
Hi, Thank you for reply. I find it caused by SqlStdOperatorTable and have tried many ways to change it, but failed. Finally, I decided to copy it and renamed. Another thing that caught my attention is that i also define last_value function which args is same to SqlStdOperatorTable,

multi-sql checkpoint fail

2020-04-17 Thread forideal
Hello friend I have two SQL, checkpoint fails all the time. One task is to open a sliding window for an hour, and then another task consumes the output data of the previous task. There will be no problem with the two tasks submitted separately. -- first Calculation -- second Write the

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread Yang Wang
Hi tison, For #3, if you mean registering remote HDFS file as local resource, we should make the "-yt/--yarnship" to support remote directory. I think it is the right direction. For #1, if the users could ship remote directory, then they could also specify like this "-yt

Re: Flink upgrade to 1.10: function

2020-04-17 Thread Jark Wu
Hi, I guess you are already close to the truth. Since Flink 1.10 we upgraded Calcite to 1.21 which reserves JSON_VALUE as keyword. So that a user define function can't use this name anymore. That's why JSON_VALUE(...) will always be parsed as the Calcite's builtin function definition. Currently,

Re: Can I use Joda-Time in Flink?

2020-04-17 Thread tison
Hi Alexander, What do you mean exactly? Could you describe it in pseudo code? I'm not quite sure where Java-Time used in env. Best, tison. Alexander Borgschulze 于2020年4月17日周五 下午9:21写道: > Can I use Joda-Time instead of Java-Time and set it up in the > StreamExecutionEnvironment? >

Can I use Joda-Time in Flink?

2020-04-17 Thread Alexander Borgschulze
Can I use Joda-Time instead of Java-Time and set it up in the StreamExecutionEnvironment?

Flink 1.10 Out of memory

2020-04-17 Thread Lasse Nedergaard
Hi. We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. We have assigned 2.2 GB/task manager and

Re: Flink upgrade to 1.10: function

2020-04-17 Thread Till Rohrmann
Hi, thanks for reporting these problems. I'm pulling in Timo and Jark who are working on the SQL component. They might be able to help you with your problem. Cheers, Till On Thu, Apr 16, 2020 at 11:10 AM seeksst wrote: > Hi, All > > > Recently, I try to upgrade flink from 1.8.2 to 1.10, but i

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang, I agree that these two of works would benefit from single assignee. My concern is as below 1. Both share libs & remote flink dist/libs are remote ship files. I don't think we have to implement multiple codepath/configuration. 2. So, for concept clarification, there are (1) an option

Re: Flink job didn't restart when a task failed

2020-04-17 Thread Till Rohrmann
Keep us posted once you caught the problem in the act. This would help to debug/understand this problem tremendously. Cheers, Till On Wed, Apr 15, 2020 at 8:44 AM Zhu Zhu wrote: > Sorry I made a mistake. Even if it's the case I had guessed, you will not > get a log "Task {} is already in state

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread Till Rohrmann
Hi Yang, from what I understand it sounds reasonable to me. Could you sync with Tison on FLINK-14964 on how to proceed. I'm not super deep into these issues but they seem to be somewhat related and Tison already did some implementation work. I'd say it be awesome if we could include this kind of

Re: AvroParquetWriter issues writing to S3

2020-04-17 Thread Arvid Heise
Hi Diogo, I saw similar issues already. The root cause is always users actually not using any Flink specific stuff, but going to the Parquet Writer of Hadoop directly. As you can see in your stacktrace, there is not one reference to any Flink class. The solution usually is to use the respective

Re: Schema with TypeInformation or DataType

2020-04-17 Thread tison
Thanks for your inputs and sorry that I said Schema doesn't support DataType to register a field because I was looking into Flink 1.9 codes... Best, tison. Jark Wu 于2020年4月17日周五 下午2:42写道: > Hi Tison, > > Migration from TypeInformation to DataType is a large work and will across > many

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
Actually, BoundedOutOfOrderWatermarkGenerator is only used in tests, the real WatermarkGenerator is code generated in WatermarkGeneratorCodeGenerator lec ssmi 于2020年4月17日周五 下午5:19写道: > I think you are all right. I have I checked the source code of > WatermarkAssignerOperator, and I have found

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
I think you are all right. I have I checked the source code of WatermarkAssignerOperator, and I have found the WatermarkGenerator passed in WatermarkAssignerOperator is the interface WatermarkGenerator. And BoundedOutOfOrderWatermarkGenerator is the only implementation class of

Re: SQL_CLI构建流式应用参数设置

2020-04-17 Thread godfrey he
第一个问题:set execution.parallelism=10; 这样命令设置的job的默认并发度。一些算子有自己并发度的设置逻辑,不受默认并发度的影响(例如 hive的source,是根据partition数来的)。你可以在提交作业之前调用set命令来修改每个job的默认并发度。 第二个问题:1.11在 FLINK-16822[1] 被fix后,你可以通过配置项方式设置checkpoint [2]。例如: set execution.checkpointing.mode=EXACTLY_ONCE。 [1]

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
WatermarkAssignerOperator is an inner mechanism for generating watermarks. The "Bounded Out of Orderness" is just one kind of the watermark expressions, which is most commonly used. The main logic of WatermarkAssignerOperator is: - keep currentWatermark and lastWatermark - when each element

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
Maybe you are all right. I was more confused . As the cwiki said, flink could use BoundedOutOfOrderTimestamps , [image: image.png] but I have heard about WatermarkAssignerOperator from Blink developers. Benchao Li 于2020年4月17日周五 下午4:33写道: > Hi lec ssmi, > > It's a good question. In blink

Re: Jars replication

2020-04-17 Thread Chesnay Schepler
My apologies, I remembered wrong. The jar endpoints all require working against the leading job master unfortunately. On 17/04/2020 10:18, Andrés Garagiola wrote: Thank Chesnay, I'm invoking the "/jars" endpoint in both JMs and only one of them answers with the uploaded jar. If I try to send

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
Hi lec ssmi, It's a good question. In blink planner, we use code gen to handle watermark expression. And in `WatermarkAssignerOperator` we calculate current watermark when each element comes in. If the watermark - lastEmitedWatermark > watermark interval, we will emit the new watermark. So it's

Re: Jars replication

2020-04-17 Thread Chesnay Schepler
Jars are not replicated to all JobManagers, this is currently expected, but generally undesirable for the use-case you out-lined. IIRC the important part is that the upload goes directly against the leader, the run request can be sent anywhere and it will be redirected internally to the

Jars replication

2020-04-17 Thread Andrés Garagiola
Hi, I'm configuring a Flink cluster with high availability based on ZooKeeper and two Job Managers. When I upload a jar using the /jars/upload REST API, I don't get the jar replicated in both JMs. Is this the expected behavior? I want to configure the cluster in such a way that once the jar is

SQL_CLI????????????????????

2020-04-17 Thread Even
Hi??Nabble 1CLI set set execution.parallelism=10??CLI 2??SQL CLI checkpoint ?? ??

回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
好的,非常感谢您,我去按照您说的代码改下,非常感谢 在2020年4月17日 15:17,Benchao Li 写道: 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 酷酷的浑蛋 于2020年4月17日周五 下午3:09写道: 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl 在2020年4月17日

Re: 关于状态TTL

2020-04-17 Thread Benchao Li
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 酷酷的浑蛋 于2020年4月17日周五 下午3:09写道: > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), > Time.minutes(6));这种方式设置ttl > > > > > 在2020年4月17日 14:54,Benchao Li 写道: > 嗯,blink planner跟legacy

How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
Hi: In sql API , the declaration of watermark is realized by ddl statement . But which way is it implemented? * PeriodicWatermark * or *PunctuatedWatermark*? There seems to be no explanation on the official website. Thanks.

回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl 在2020年4月17日 14:54,Benchao Li 写道: 嗯,blink planner跟legacy planner是有一些实现上的差异。 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig

Re: 关于状态TTL

2020-04-17 Thread Benchao Li
而且https://issues.apache.org/jira/browse/FLINK-15938 和 https://issues.apache.org/jira/browse/FLINK-16581 这两个issue现在已经都merge了,你也可以cherry-pick过去。 Benchao Li 于2020年4月17日周五 下午2:54写道: > 嗯,blink planner跟legacy planner是有一些实现上的差异。 > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: > >

Re: 关于状态TTL

2020-04-17 Thread Benchao Li
嗯,blink planner跟legacy planner是有一些实现上的差异。 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { if (stateCleaningEnabled) { checkArgument(retentionTime > 0); return StateTtlConfig

回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 在2020年4月17日 14:16,Benchao Li 写道: 这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1]

Re: 为消息分配时间戳但不想重新分配水印

2020-04-17 Thread taowang
我的测试结果时,把原先的丢弃掉,完全采用最新的逻辑。 原始邮件 发件人: lec ssmi 收件人: flink-user-cn 发送时间: 2020年4月17日(周五) 14:43 主题: Re: 为消息分配时间戳但不想重新分配水印 watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的? taowang 于2020年4月17日周五 上午10:46写道: >

Re: 为消息分配时间戳但不想重新分配水印

2020-04-17 Thread lec ssmi
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的? taowang 于2020年4月17日周五 上午10:46写道: > 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。 > > > 原始邮件 > 发件人: lec ssmi > 收件人: flink-user-cn > 发送时间: 2020年4月17日(周五) 09:25 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > >

Re: Schema with TypeInformation or DataType

2020-04-17 Thread Jark Wu
Hi Tison, Migration from TypeInformation to DataType is a large work and will across many releases. As far as I can tell, we will finalize the work in 1.11. As godfrey said above, Flink SQL & Table API should always use DataType, DataStream uses TypeInformation. Schema already supports DataType

Re: Schema with TypeInformation or DataType

2020-04-17 Thread godfrey he
Hi tison, >1. Will TypeInformation be deprecated and we use DataType as type system everywhere? AFAIK, runtime will still supports TypeInformation, while table module supports DataType > 2. Schema in Table API currently support only TypeInformation to register a field, shall we support the

Re: 关于状态TTL

2020-04-17 Thread Benchao Li
这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 于2020年4月17日周五 下午2:06写道: >

Schema with TypeInformation or DataType

2020-04-17 Thread tison
Hi, I notice that our type system has two branches. One is TypeInformation while the other is DataType. It is said that Table API will use DataType but there are several questions about this statement: 1. Will TypeInformation be deprecated and we use DataType as type system everywhere? 2.

回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 在2020年4月17日 13:07,Benchao Li 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 于2020年4月17日周五