Re: DataStream的state问题

2020-07-06 文章 Congxian Qiu
Hi

是最后一次 access 的时间到当前的时间超过了你设置的 ttl 间隔,比如你配置的是 `OnCreateAndWrite`
那么就是创建和写操作之后的 1 天,这个 state 会变成 expired,具体的可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian


ゞ野蠻遊戲χ  于2020年7月7日周二 下午12:17写道:

> Deal all:
>
>
> 想问下,在给state设置ttl的时候,如下面的代码: 
> StateTtlConfig ttlConfig = StateTtlConfig
>  
> .newBuilder(Time.days(1))
>  
>
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>  
> .build();
>
>
>
> 设置了1天时间之后失效,例如2020-07-07 08:30:00点开始的job,那失效时间是这个时间段2020-07-07
> 00:00:00~2020-07-07 23:59:59,还是job上线之后,2020-07-07 08:30:00~2020-07-08
> 08:30:00这个时间段?
>
>
> Thanks
> Jiazhi


DataStream??state????

2020-07-06 文章 ?g???U?[????
Deal all:


??,statettl?? 
 StateTtlConfig ttlConfig = StateTtlConfig
   
   .newBuilder(Time.days(1))
   
   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
   
   
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
   
   .build();



??12020-07-07 
08:30:00job2020-07-07 00:00:00~2020-07-07 
23:59:59??job??2020-07-07 08:30:00~2020-07-08 08:30:00???


Thanks
Jiazhi

FlinkWebUI 参数疑问

2020-07-06 文章 liuhy_em...@163.com
Dear,
请问FlinkUI中TaskManager页面的参数含义是什么,有相关的文章描述吗?




Thanks,
Hongyang


回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-06 文章 seeksst
我看你代码上是sqlUpdate,tableConfig是另外设置的,需要作为入参一同放入sqlUpdate中,
使用方法sqlUpdate(str, config)
另外如果你使用的是rocksdb,需要开启rocksdb的ttl
state.backend.rocksdb.ttl.compaction.filter.enabled设置成true
低版本这个参数默认是false




原始邮件
发件人:x35907...@qq.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年7月7日(周二) 10:46
主题:回复: 求助:FLINKSQL1.10实时统计累计UV


是blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() 
--nbsp;原始邮件nbsp;-- 发件人:nbsp;"Benchao 
Li"libenchao@apache.orggt;; 发送时间:nbsp;2020年7月6日(星期一) 晚上11:11 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV 感觉不太应该有这种情况,你用的是blink planner么? x 35907418@qq.comgt; 
于2020年7月6日周一 下午1:24写道: gt; sorry,我说错了,确实没有,都是group agg. gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
 gt; gt; gt; --amp;nbsp;原始邮件amp;nbsp;-- gt; 
发件人:amp;nbsp;"Benchao Li"libenchao@apache.orgamp;gt;; gt; 
发送时间:amp;nbsp;2020年7月6日(星期一) 中午12:52 gt; 
收件人:amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;gt;; gt; gt; 主题:amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; gt; gt; gt; 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 gt; 
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 gt; gt; [1] gt; gt; 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT 

Re: 关于jdbc connector扩展问题

2020-07-06 文章 Jark Wu
Hi,

目前 flink-connector-jdbc 还不支持注册
dialect,社区有这方面的计划,但是目前还没有资源做这一块,这是个比较复杂的功能,需要对接口做细致的设计。
目前你可以拿 flink-connector-jdbc 源码,加一个自己的 Dialect 类,在 JdbcDialects 中注册进你的
dialect,然后编译打包就可以了。

Best,
Jark

On Mon, 6 Jul 2020 at 20:10, claylin <1012539...@qq.com> wrote:

> hi all我这里有个需求需要从sql里面写数据到clickhouse里面,但是看源码,发现并不好扩展,
> https://github.com/apache/flink/blob/d04872d2c6b7570ea3ba02f8fc4fca02daa96118/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java#L30,
> 这里我看直接写死仅支持DerbyDialect、MySQLDialect、PostgresDialect,而且这个类不支持注册jdbc新驱动,如果想在SQL里面支持其他类型的数据库的话,该怎么弄,求支招


?????? ??????FLINKSQL1.10????????????UV

2020-07-06 文章 x
??blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()





----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

 x <35907...@qq.comgt; ??2020??7??6?? 11:15??

 gt; ??1.10.1??sinkwindow??count
 gt; distinct??window??count
 gt;
 
distinct??windowgroupamp;nbsp;DATE_FORMAT(rowtm,
 gt; '-MM-dd') sql??
 gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
 gt;nbsp;nbsp; """
 gt;nbsp;nbsp;nbsp;nbsp; SELECT 
MAX(DATE_FORMAT(rowtm, '-MM-dd
 HH:mm:00'))
 gt; time_str,COUNT(DISTINCT userkey) uv
 gt;nbsp;nbsp;nbsp;nbsp; FROM source
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY 
DATE_FORMAT(rowtm, '-MM-dd')
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
 gt;
 gt; val totaluvTmp =
 tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 gt;nbsp;nbsp; .filter( line =amp;gt; line._1 == true 
).map( line
 =amp;gt; line._2 )
 gt;
 gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
 gt;
 gt; tabEnv.sqlUpdate(
 gt;nbsp;nbsp; s"""
 gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO mysql_totaluv
 gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2)
 gt;nbsp;nbsp;nbsp;nbsp; FROM $totaluvTabTmp
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Benchao 
Li"https://issues.apache.org/jira/browse/FLINK-17942
 gt;
 gt; x <35907...@qq.comamp;gt; ??2020??7??3?? 4:34??
 gt;
 gt; amp;gt; 
checkpoint??
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 
--amp;amp;nbsp;amp;amp;nbsp;--
 gt; amp;gt; ??:amp;amp;nbsp;"Jark 
Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; x 
<35907...@qq.com
 amp;amp;amp;amp;gt;
 gt; ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CREATE
 VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 SELECTamp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; 
MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;amp;nbsp;,
 gt; amp;gt; amp;amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 HH:mm:00'))amp;amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; AS
 gt; time_str,amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; COUNT(DISTINCT user_id) OVER
 gt; amp;gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; FROM
 user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; WINDOW w
 AS (ORDER BY proctime
 gt; ROWS BETWEEN
 gt; amp;gt; UNBOUNDED
 gt; amp;gt; amp;amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CURRENT
 ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; PARTITION
 BY
 gt; DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 PS??1.10??DDL??CREATE
 gt; VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; 
 gt;
 gt;
 gt;
 gt; --
 gt;
 gt; Best,
 gt; Benchao Li



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

Re:Re: 如何在窗口关闭的时候清除状态

2020-07-06 文章 flink小猪



[1].设置TTL应该也能达到相同的效果,我还是希望在窗口关闭的时候能够做一些自定义的操作(比如这里的清除状态,也许之后会有其他的操作TTL就不一样好用了)
[2].KeyedProcessFunction,应该自己注册定时器把,在我的代码里面是timeWIndow().trigger().process(), 
ProcessWindowFunction方法我只需要处理逻辑即可,不需要管定时的窗口。














在 2020-07-05 11:56:03,"Congxian Qiu"  写道:
>看上去这个需求是 一天的窗口,每个小时都 trigger 一次,希望 state 在 1 天之后进行清理。
>你可以尝试一下 TTL[1] State
>另外想问一下,你自己写 ProcessWindowFunction 的话,为什么不考虑 KeyedProcessFunction[2] 呢
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
>Best,
>Congxian
>
>
>JasonLee <17610775...@163.com> 于2020年7月4日周六 下午8:29写道:
>
>> 设置一下状态过期的时间呢
>>
>>
>> | |
>> JasonLee
>> |
>> |
>> 邮箱:17610775...@163.com
>> |
>>
>> Signature is customized by Netease Mail Master
>>
>> 在2020年07月03日 14:02,18579099...@163.com 写道:
>>
>> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢?
>>
>> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。
>>
>> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢?
>>
>>
>>
>> 18579099...@163.com
>>


Re: Flink从SavePoint启动任务,修改的代码不生效

2020-07-06 文章 Paul Lam
估计你是用同一个 Kafka Source 消费 A B 两个 Topic? 如果是,看起来像是 Kafka Connector 早期的一个问题。

作业停止的时候,Topic B 的 partition offset 被存储到 Savepoint 中,然后在恢复的时候,尽管代码中 Topic B 
已经被移除,但它的 partition offset 还是被恢复了。

这个问题在后来的版本,估计是 1.8 或 1.9,被修复了。

Best,
Paul Lam

> 2020年7月6日 20:55,milan183sansiro  写道:
> 
> 你好:
>1.没有给算子手动设置id
>2.设置savepoint恢复的路径是正确的
> 
> 
> 在2020年7月6日 20:32,wujunxi<462329...@qq.com> 写道:
> 你好,确认以下两个点
> 1.是否给每个算子设置了id
> 2.设置savepoint恢复的路径是否正确
> 
> 
> 
> --原始邮件--
> 发件人:"milan183sansiro" 发送时间:2020年7月6日(星期一) 晚上7:55
> 收件人:"user-zh" 
> 主题:Flink从SavePoint启动任务,修改的代码不生效
> 
> 
> 
> 各位好:
>  
> 背景:Flink版本1.6.4,数据源为Kafka的主题A,B,消费者组相同
>  操作步骤:1.使用SavePoint取消任务。
>  2.修改代码将B去掉,只消费A主题。
>  
> 3.从SavePoint启动任务,发现消费者组在B主题下的偏移量也回到了任务停止时的偏移量,之后偏移量马上变成了最新并继续消费。
>  想知道为什么修改代码不生效。



flink 1.10.1 入 hive 格式为parquet

2020-07-06 文章 lydata
 Hi,

可以提供一份flink1.10 入hive格式为parquet的例子吗?

Best,
lydata

Re: Flink SQL复杂JSON解析

2020-07-06 文章 Leonard Xu
Hi,

Schema 里可以声明成array, 推荐有拆成多行数据的需求用UDTF处理,现在source里是没看到有拆分多行的实现。

Best,
Leonard Xu

> 在 2020年7月6日,21:28,王 outlook  写道:
> 
> TableFunction这个UDTF



Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-06 文章 Benchao Li
感觉不太应该有这种情况,你用的是blink planner么?

x <35907...@qq.com> 于2020年7月6日周一 下午1:24写道:

> sorry,我说错了,确实没有,都是group agg.
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年7月6日(星期一) 中午12:52
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
> 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> x <35907...@qq.com 于2020年7月6日周一 上午11:15写道:
>
>  版本是1.10.1,最后sink的时候确实是一个window里面做count
>  distinct操作。请问是只要计算过程中含有一个window里面做count
> 
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupnbsp;DATE_FORMAT(rowtm,
>  '-MM-dd') 这个sql对应的状态很大。代码如下:
>  val rt_totaluv_view : Table = tabEnv.sqlQuery(
>  """
>  SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd
> HH:mm:00'))
>  time_str,COUNT(DISTINCT userkey) uv
>  FROM source
>  GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
>  """)
>  tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
> 
>  val totaluvTmp =
> tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
>  .filter( line =gt; line._1 == true ).map( line
> =gt; line._2 )
> 
>  val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
> 
>  tabEnv.sqlUpdate(
>  s"""
>  INSERT INTO mysql_totaluv
>  SELECT _1,MAX(_2)
>  FROM $totaluvTabTmp
>  GROUP BY _1
>  """)
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年7月3日(星期五) 晚上9:47
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
>  这个已经在1.11中修复了。
> 
>  [1] https://issues.apache.org/jira/browse/FLINK-17942
> 
>  x <35907...@qq.comgt; 于2020年7月3日周五 下午4:34写道:
> 
>  gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
>  gt;
>  gt;
> 
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:amp;nbsp;"Jark Wu"  gt; 发送时间:amp;nbsp;2020年6月18日(星期四) 中午12:16
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt;
>  gt;
>  gt;
>  gt; 是的,我觉得这样子是能绕过的。
>  gt;
>  gt; On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comamp;gt;
> wrote:
>  gt;
>  gt; amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
>  gt; amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
>  gt; amp;gt;amp;nbsp;amp;nbsp; """
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT
>  MAX(DATE_FORMAT(ts, '-MM-dd
>  gt; HH:mm:00'))
>  gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM
>  user_behavioramp;nbsp;amp;nbsp;amp;nbsp; GROUP BY
>  gt; DATE_FORMAT(ts,
> '-MM-dd')amp;nbsp;amp;nbsp;amp;nbsp; """)
>  gt; amp;gt;
>  gt; amp;gt; val
>  resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>  gt; amp;gt;amp;nbsp;amp;nbsp;
>  gt;
> .filter(line=amp;amp;gt;line._1==true).map(line=amp;amp;gt;line._2)
>  gt; amp;gt;
>  gt; amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
>  gt; amp;gt; tabEnv.sqlUpdate(
>  gt; amp;gt;amp;nbsp;amp;nbsp; s"""
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT
> INTO
>  rt_totaluv
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT
> _1,MAX(_2)
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM
> $res
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP
> BY _1
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """)
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:amp;amp;nbsp;"Jark Wu"<
> imj...@gmail.comamp;amp;gt;;
>  gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
>  gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.org
>  amp;amp;gt;;
>  gt; amp;gt;
>  gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; 在 Flink 1.11 中,你可以尝试这样:
>  gt; amp;gt;
>  gt; amp;gt; CREATE TABLE mysql (
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; time_str
> STRING,
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; uv BIGINT,
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; PRIMARY
> KEY (ts) NOT ENFORCED
>  gt; amp;gt; ) WITH (
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp;
> 'connector' = 'jdbc',
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; 'url' =
>  'jdbc:mysql://localhost:3306/mydatabase',
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp;
> 'table-name' = 'myuv'
>  gt; amp;gt; );
>  gt; amp;gt;
>  gt; amp;gt; INSERT INTO mysql
>  gt; amp;gt; SELECT MAX(DATE_FORMAT(ts, '-MM-dd
> HH:mm:00')),
>  gt; COUNT(DISTINCTamp;amp;nbsp;
>  gt; amp;gt; user_id)
>  gt; amp;gt; FROM user_behavior;
>  gt; amp;gt;
>  gt; amp;gt; On Wed, 17 Jun 2020 at 13:49, x <
> 35907...@qq.comamp;amp;gt;
>  wrote:
>  gt; amp;gt;
>  gt; amp;gt; amp;amp;gt;
> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  gt; amp;gt; 

Re: flink sql子查询状态清理不掉

2020-07-06 文章 Benchao Li
感觉不太应该。你用的是哪个Flink版本,以及哪个planner呢?

op <520075...@qq.com> 于2020年7月6日周一 上午11:31写道:

> 大家好,我现在程序里面有像这样一段sql: select day,
>  count(id),
>  sum(v1) from
> (
> select
>  day ,
>  id ,
>  sum(v1) v1 from source
>group by day,
>  id
> )t
>
>
> group by day
>
>
> 我设置了
> tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))
>
>
>
> 子查询里面的聚合是按照天和id聚合的,按道理1天之后就会自动清理,但是运行4天过程中,我在checkpoint的页面里面看到这个子查询的状态一直在增大,这是什么原因呢
> 我的版本是1.10.0



-- 

Best,
Benchao Li


Re: flink interval join后按窗口聚组问题

2020-07-06 文章 Benchao Li
我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。

因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。

我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
的时间最早的那个。

元始(Bob Hu) <657390...@qq.com> 于2020年7月5日周日 下午8:48写道:

> 谢谢您的解答。感觉flink这个机制有点奇怪呢
>
>
> -- 原始邮件 --
> *发件人:* "Benchao Li";
> *发送时间:* 2020年7月5日(星期天) 中午11:58
> *收件人:* "元始(Bob Hu)"<657390...@qq.com>;
> *抄送:* "user-zh";
> *主题:* Re: flink interval join后按窗口聚组问题
>
> 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> 就会有些问题,很多数据被作为late数据直接丢掉了。
>
> 元始(Bob Hu) <657390...@qq.com> 于2020年7月3日周五 下午3:29写道:
>
>> 您好,我想请教一个问题:
>> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
>> and a.rowtime + INTERVAL '1' HOUR
>> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> allowedLateness +
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> rightRelativeSize) +
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> by的时候这种右表数据为空的数据就丢掉了啊。
>> flink版本 1.10.0。
>>
>> 下面是我的一段测试代码:
>>
>> import org.apache.commons.net.ntp.TimeStamp;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.ScalarFunction;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.IOUtils;
>>
>> import java.io.BufferedReader;
>> import java.io.InputStreamReader;
>> import java.io.Serializable;
>> import java.net.InetSocketAddress;
>> import java.net.Socket;
>> import java.sql.Timestamp;
>> import java.text.SimpleDateFormat;
>> import java.util.ArrayList;
>> import java.util.Date;
>> import java.util.List;
>>
>> public class TimeBoundedJoin {
>>
>> public static AssignerWithPeriodicWatermarks getWatermark(Integer 
>> maxIdleTime, long finalMaxOutOfOrderness) {
>> AssignerWithPeriodicWatermarks timestampExtractor = new 
>> AssignerWithPeriodicWatermarks() {
>> private long currentMaxTimestamp = 0;
>> private long lastMaxTimestamp = 0;
>> private long lastUpdateTime = 0;
>> boolean firstWatermark = true;
>> //Integer maxIdleTime = 30;
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> if(firstWatermark) {
>> lastUpdateTime = System.currentTimeMillis();
>> firstWatermark = false;
>> }
>> if(currentMaxTimestamp != lastMaxTimestamp) {
>> lastMaxTimestamp = currentMaxTimestamp;
>> lastUpdateTime = System.currentTimeMillis();
>> }
>> if(maxIdleTime != null && System.currentTimeMillis() - 
>> lastUpdateTime > maxIdleTime * 1000) {
>> return new Watermark(new Date().getTime() - 
>> finalMaxOutOfOrderness * 1000);
>> }
>> return new Watermark(currentMaxTimestamp - 
>> finalMaxOutOfOrderness * 1000);
>>
>> }
>>
>> @Override
>> public long extractTimestamp(Row row, long 
>> previousElementTimestamp) {
>> Object value = row.getField(1);
>> long timestamp;
>> try {
>> timestamp = (long)value;
>> } catch (Exception e) {
>> timestamp = ((Timestamp)value).getTime();
>> }
>> if(timestamp > currentMaxTimestamp) {
>> currentMaxTimestamp = timestamp;
>> }
>> return timestamp;
>> }
>> };
>> return timestampExtractor;
>> }
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment bsEnv = 
>> 

Re: Flink SQL复杂JSON解析

2020-07-06 文章 Benchao Li
我理解最佳实践是第一种,先读出来array,再用table function展开成多行。
实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

王 outlook  于2020年7月6日周一 下午9:29写道:

> 像如下这种JSON输入,
>
> {
>   "id": 1,
>   "many_names": [
> {"name": "foo"},
> {"name": "bar"}
>   ]
> }
>
> 输出表两行  id 1, name foo  |  id 1, name bar
>
> 最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
> 还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
>
>
> 来自 Outlook
>


-- 

Best,
Benchao Li


Flink SQL复杂JSON解析

2020-07-06 文章 王 outlook
像如下这种JSON输入,

{
  "id": 1,
  "many_names": [
{"name": "foo"},
{"name": "bar"}
  ]
}

输出表两行  id 1, name foo  |  id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。


来自 Outlook


Re: 滑动窗口数据存储多份问题

2020-07-06 文章 Congxian Qiu
Hi

我理解,如果只存取一份的话,state 的管理会变得麻烦一些(所有需要这份数据的窗口都需要去某个地方取, state 什么时候清理逻辑也会变得麻烦一些)

Best,
Congxian


张浩  于2020年7月6日周一 下午1:57写道:

> 你好,我的思考是便于在状态信息中清除或者提取每一个窗口的数据信息。
> 不知道,我这样理解的对吗?
> 另外,为什么我们不能只存储一份数据呢?
> 非常感谢与您交流!
>
>
>
> 张浩
> 邮箱:zhanghao_w...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> 在2020年07月06日 13:46,Congxian Qiu  写道:
> Hi
> 现在的实现是这样的,每条数据会在每个窗口中存一份
>
> Best,
> Congxian
>
>
> 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
>
> > Hi,all!
> > 由于第一次咨询,我不确定上一份邮件大家是否收到。
> > 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> > 份?
> >
> >
> > | |
> > 张浩
> > |
> > |
> > 13669299...@163.com
> > |
> > 签名由网易邮箱大师定制
>
>


??????Flink??SavePoint??????????????????????????

2020-07-06 文章 milan183sansiro
??
1.??id
2.savepoint??


??2020??7??6?? 20:32??wujunxi<462329...@qq.com> ??

1.id
2.savepoint??



----
??:"milan183sansiro"

??????Flink??SavePoint??????????????????????????

2020-07-06 文章 wujunxi

1.id
2.savepoint??



----
??:"milan183sansiro"

????jdbc connector????????

2020-07-06 文章 claylin
hi 
allsqlclickhouse??https://github.com/apache/flink/blob/d04872d2c6b7570ea3ba02f8fc4fca02daa96118/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java#L30,
 
??DerbyDialect??MySQLDialect??PostgresDialect??jdbcSQL??

Flink从SavePoint启动任务,修改的代码不生效

2020-07-06 文章 milan183sansiro
各位好:
背景:Flink版本1.6.4,数据源为Kafka的主题A,B,消费者组相同
操作步骤:1.使用SavePoint取消任务。
2.修改代码将B去掉,只消费A主题。
3.从SavePoint启动任务,发现消费者组在B主题下的偏移量也回到了任务停止时的偏移量,之后偏移量马上变成了最新并继续消费。
想知道为什么修改代码不生效。



Re: Flink状态调试

2020-07-06 文章 shizk233
抱歉,我似乎回错邮件了,应该使用回复全部?

Congxian Qiu  于2020年7月6日周一 下午7:22写道:

> Hi
> 想 debug checkpoint 文件的话,可以参考下这个 UT[1]
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
> Best,
> Congxian
>
>
> Z-Z  于2020年7月6日周一 下午4:59写道:
>
> > Hi, 各位大佬们,请教一下:
> > Flink的checkpoint怎么调试啊,我想看程序目前的状态,拿到了checkpoint的文件,打开后有一些东西是乱码,没有结构性,有方法吗?
>


Re: Flink状态调试

2020-07-06 文章 shizk233
Hi Z-Z,

如果你想查看的是程序中的state内容,建议触发一次savepoint并搭配state processor api来查询。

参考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html

Best,
shizk233

Congxian Qiu  于2020年7月6日周一 下午7:22写道:

> Hi
> 想 debug checkpoint 文件的话,可以参考下这个 UT[1]
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
> Best,
> Congxian
>
>
> Z-Z  于2020年7月6日周一 下午4:59写道:
>
> > Hi, 各位大佬们,请教一下:
> > Flink的checkpoint怎么调试啊,我想看程序目前的状态,拿到了checkpoint的文件,打开后有一些东西是乱码,没有结构性,有方法吗?
>


Re: Flink状态调试

2020-07-06 文章 Congxian Qiu
Hi
想 debug checkpoint 文件的话,可以参考下这个 UT[1]

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,
Congxian


Z-Z  于2020年7月6日周一 下午4:59写道:

> Hi, 各位大佬们,请教一下:
> Flink的checkpoint怎么调试啊,我想看程序目前的状态,拿到了checkpoint的文件,打开后有一些东西是乱码,没有结构性,有方法吗?


Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-06 文章 shizk233
Hi Sun ZHu,

关于方法4,我记得kafka有时间轮功能,可以做到延迟消息的,可以了解一下。

Best,
shizk233

Sun.Zhu <17626017...@163.com> 于2020年7月4日周六 上午12:23写道:

> 感谢benchao和forideal的方案,
> 方法1.使用udf,查不到 sleep 等一下在查
> --这个可以尝试
> 方法2.在 join operator处数据等一会再去查
> —我们使用的是flink sql,不是streaming,所以该方案可能行不通
> 方法3.如果没有 join 上,就把数据发到source,循环join。
> --我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率
> 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了
> —我们的source是kafka,好像不支持kafka的功能
> 方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait
> 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。
> --这个方案需要修改源码,也可以试一下
>
>
> Best
> Sun.Zhu
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年07月3日 23:26,forideal 写道:
> Hi
>
>
>
>
> 刚刚本超说了四种方法,
>
> 方法1.使用udf,查不到 sleep 等一下在查
>
> 方法2.在 join operator处数据等一会再去查
>
> 方法3.如果没有 join 上,就把数据发到source,循环join。
>
> 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了
>
>
>
>
> 上述方法应该都能实现相同的效果。
>
>
>
>
> 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait
> 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。
>
>
>
>
> Best forideal
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-03 23:05:06,"Benchao Li"  写道:
> 奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。
>
> admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:
>
> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
>
> 感谢您的回复
>
>
>
> --
>
> Best,
> Benchao Li
>


jemalloc dump 内存

2020-07-06 文章 SmileSmile
hi,社区的各位,是否有配置过jemalloc?

目前在docker容器中放入编译过后的jemalloc,在flink的配置文件加入如下配置

containerized.taskmanager.env.LD_PRELOAD: /opt/jemalloc/lib/libjemalloc.so
   containerized.taskmanager.env.MALLOC_CONF: 
prof:true,lg_prof_interval:25,lg_prof_sample:17,prof_prefix:/opt/state/jeprof.out

结果生成不到对应的heap文件, /opt/state挂载在宿主机的磁盘上,权限给了。

请问该如何操作才可以生产内存dump呢?


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: Flink SQL 关键字 user?

2020-07-06 文章 Leonard Xu
Hi,
是的,用`user`转义处理下,
完整的保留关键字参考[1]

Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/ 


> 在 2020年7月6日,17:06,王 outlook  写道:
> 
> "CREATE TABLE ods_foo (\n" +
> "id INT,\n" +
> "user ARRAY>\n" +
> ") WITH (



Re: Flink SQL 关键字 user?

2020-07-06 文章 Benchao Li
是的,user是关键字,关键字列表可以参考[1].

如果遇到关键字,可以使用 ` 来处理,比如:
CREATE TABLE `user` (...) WITH (...);

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/#reserved-keywords

王 outlook  于2020年7月6日周一 下午5:07写道:

> 我发现我创建表 字段名为 user会报错。 user是关键字吗,还是其他原因
>
>
> "CREATE TABLE ods_foo (\n" +
> "id INT,\n" +
> "user ARRAY>\n" +
> ") WITH (
>
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "user" at line 3, column 5.
> Was expecting one of:
>
>
>
>

-- 

Best,
Benchao Li


Flink SQL 关键字 user?

2020-07-06 文章 王 outlook
我发现我创建表 字段名为 user会报错。 user是关键字吗,还是其他原因


"CREATE TABLE ods_foo (\n" +
"id INT,\n" +
"user ARRAY>\n" +
") WITH (

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "user" at line 3, column 5.
Was expecting one of:





Flink????????

2020-07-06 文章 Z-Z
Hi?? ?? 
Flink??checkpointcheckpoint??

Re: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 文章 Congxian Qiu
@chenkaibit 多谢你的回复~

Best,
Congxian


chenkaibit  于2020年7月6日周一 下午3:53写道:

> hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和
> <高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10
> 引入的 MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda
> 表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过
> NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。
>
>
> --
> Best, yuchuan
>
>
>
> 在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
> >@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
> >CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
> >
> >Best,
> >Congxian
> >
> >
> >陈凯  于2020年7月6日周一 上午9:53写道:
> >
> >>
> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> >> 我之前提了个jira 描述了这个问题
> >> https://issues.apache.org/jira/browse/FLINK-18196
> >>
> >> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
> >>
> >> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> >>
> >>
> >>
> >> -邮件原件-
> >> 发件人: zhisheng 
> >> 发送时间: 2020年7月5日 15:01
> >> 收件人: user-zh 
> >> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
> >>
> >> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
> >>
> >> Best!
> >> zhisheng
> >>
> >> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
> >>
> >> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
> >> >
> >> > Best,
> >> > Congxian
> >> >
> >> >
> >> > zhisheng  于2020年7月4日周六 下午12:27写道:
> >> >
> >> > > 我们也有遇到过这个异常,但是不是很常见
> >> > >
> >> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >> > >
> >> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> >> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> >> > > > Best,
> >> > > > Congxian
> >> > > >
> >> > > >
> >> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> >> > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> >> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> >> > > > > >
> >> > > > > >
> >> > > > > >| |
> >> > > > > >JasonLee
> >> > > > > >|
> >> > > > > >|
> >> > > > > >邮箱:17610775...@163.com
> >> > > > > >|
> >> > > > > >
> >> > > > > >Signature is customized by Netease Mail Master
> >> > > > > >
> >> > > > > >在2020年07月01日 20:43,程龙 写道:
> >> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> >> > > > > >
> >> > > > > >
> >> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
> >> operator
> >> > > > > Filter -> Map (2/8).
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > >
> >> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> >> > > > > >   at
> >> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> >> > > > > >   at
> >> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> >> > > > > >   at java.lang.Thread.run(Thread.java:745)
> >> > > > > >Caused by: java.lang.NullPointerException
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> >> > > > > >   at
> >> > > > >
> >> > 

?????? flink interval join????????????????

2020-07-06 文章 ????(Bob Hu)
??flink??




----
??:"Benchao Li"

Re:Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 文章 chenkaibit
hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和 
<高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10 引入的 
MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda 
表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过 
NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。




--

Best, yuchuan





在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
>@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
>CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
>
>Best,
>Congxian
>
>
>陈凯  于2020年7月6日周一 上午9:53写道:
>
>>
>> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
>> 我之前提了个jira 描述了这个问题
>> https://issues.apache.org/jira/browse/FLINK-18196
>>
>> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>>
>> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>>
>>
>>
>> -邮件原件-
>> 发件人: zhisheng 
>> 发送时间: 2020年7月5日 15:01
>> 收件人: user-zh 
>> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
>>
>> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
>>
>> Best!
>> zhisheng
>>
>> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
>>
>> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > zhisheng  于2020年7月4日周六 下午12:27写道:
>> >
>> > > 我们也有遇到过这个异常,但是不是很常见
>> > >
>> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
>> > >
>> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
>> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
>> > > > Best,
>> > > > Congxian
>> > > >
>> > > >
>> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
>> > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
>> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
>> > > > > >
>> > > > > >
>> > > > > >| |
>> > > > > >JasonLee
>> > > > > >|
>> > > > > >|
>> > > > > >邮箱:17610775...@163.com
>> > > > > >|
>> > > > > >
>> > > > > >Signature is customized by Netease Mail Master
>> > > > > >
>> > > > > >在2020年07月01日 20:43,程龙 写道:
>> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
>> > > > > >
>> > > > > >
>> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
>> operator
>> > > > > Filter -> Map (2/8).
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > > >
>> > >
>> >
>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > > >
>> > >
>> >
>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>> > > > > >   at
>> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>> > > > > >   at
>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>> > > > > >   at java.lang.Thread.run(Thread.java:745)
>> > > > > >Caused by: java.lang.NullPointerException
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> 

Re: flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空

2020-07-06 文章 Benchao Li
Hi Jim,

这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?

[1] https://issues.apache.org/jira/browse/FLINK-18002

Jim Chen  于2020年7月6日周一 上午11:28写道:

> Hi,
> 可以通过以下步骤还原车祸现场:
> kafka topic: test_action
> kafka message:
>   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>
> 代码Problem2.java:
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  *
>  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
>  * 那么在eval方法接收到的就是Row[],
>  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>  *
>  * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
>  * 现在的问题,就是查询出来,都是空
>  *
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem2 {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> bsEnv.registerFunction("explode3", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable3 (\n" +
> "action STRING\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',\n" +
> "'connector.version' = '0.11',\n" +
> "'connector.topic' = 'test_action',\n" +
> "'connector.startup-mode' = 'earliest-offset',\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> "'update-mode' = 'append',\n" +
> "'format.type' = 'json',\n" +
> "'format.derive-schema' = 'false',\n" +
> "'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
> ")";
> System.out.println(ddlSource);
> bsEnv.sqlUpdate(ddlSource);
>
> Table table = bsEnv.sqlQuery("select * from actionTable3");
> //Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
> TABLE(explode3(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print();// 输出都是空
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>


-- 

Best,
Benchao Li


回复:rocksdb的block cache usage应该如何使用

2020-07-06 文章 SmileSmile
hi yun tang!

我在容器内加入了libjemalloc.so.2并且在配置中加上了
containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc.so.2"
containerized.master.env.MALLOC_CONF: 
"prof:true,lg_prof_interval:25,lg_prof_sample:17"

请问要如何可以得到内存文件?试着kill一个tm,找不到对应的heap文件。求助



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 19:13,Yun Tang 写道:
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


?????? ????????????????????????????

2020-07-06 文章 kcz
windowflink??




----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
Best,
Congxian


JasonLee <17610775...@163.com ??2020??7??4?? 8:29??

 


 | |
 JasonLee
 |
 |
 ??17610775...@163.com
 |

 Signature is customized by Netease Mail Master

 ??2020??07??03?? 14:02??18579099...@163.com ??

 
??ProcessWindowFunctionvalueState

 
1.??ProcessWindowFunction??process??1??trigger

 
process??valueState



 18579099...@163.com


Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 文章 Congxian Qiu
@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。

Best,
Congxian


陈凯  于2020年7月6日周一 上午9:53写道:

>
> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> 我之前提了个jira 描述了这个问题
> https://issues.apache.org/jira/browse/FLINK-18196
>
> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
>
> -邮件原件-
> 发件人: zhisheng 
> 发送时间: 2020年7月5日 15:01
> 收件人: user-zh 
> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
>
> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
>
> Best!
> zhisheng
>
> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
>
> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
> >
> > Best,
> > Congxian
> >
> >
> > zhisheng  于2020年7月4日周六 下午12:27写道:
> >
> > > 我们也有遇到过这个异常,但是不是很常见
> > >
> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> > >
> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > > > > >
> > > > > >
> > > > > >| |
> > > > > >JasonLee
> > > > > >|
> > > > > >|
> > > > > >邮箱:17610775...@163.com
> > > > > >|
> > > > > >
> > > > > >Signature is customized by Netease Mail Master
> > > > > >
> > > > > >在2020年07月01日 20:43,程龙 写道:
> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > > > > >
> > > > > >
> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
> operator
> > > > > Filter -> Map (2/8).
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > >
> > > >
> > >
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > >
> > > >
> > >
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > >
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > > > > >   at
> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > > > > >   at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > > > > >   at java.lang.Thread.run(Thread.java:745)
> > > > > >Caused by: java.lang.NullPointerException
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> > > > >
> > > >
> > >
> >
>