Re: Heartbeat of TaskManager timed out.

2020-07-06 Thread Ori Popowski
Hi,

I just wanted to update that the problem is now solved!

I suspect that Scala's flatten() method has a memory problem on very large
lists (> 2 billion elements). When using Scala Lists, the memory seems to
leak but the app keeps running, and when using Scala Vectors, a weird
IllegalArgumentException is thrown [1].

I implemented my own flatten() method using Arrays and quickly ran into
NegativeArraySizeException since the integer representing the array size
wrapped around at Integer.MaxValue and became negative. After I started
catching this exception all my cluster problems just resolved. Checkpoints,
the heartbeat timeout, and also the memory and CPU utilization.

I still need to confirm my suspicion towards Scala's flatten() though,
since I haven't "lab-tested" it.

[1] https://github.com/NetLogo/NetLogo/issues/1830

On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski  wrote:

> Hi,
>
> I initially thought this, so this is why my heap is almost 30GiB.
> However, I started to analyze the Java Flight Recorder files, and I
> suspect there's a memory leak in Scala's flatten() method.
> I changed the line that uses flatten(), and instead of flatten() I'm just
> creating a ByteArray the size flatten() would have returned, and I no
> longer have the heartbeat problem.
>
> So now my code is
> val recordingData = recordingBytes.flatten
>
> instead of
> val recordingData =
> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>
> I attach a screenshot of Java Mission Control
>
>
>
> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song  wrote:
>
>> I agree with Roman's suggestion for increasing heap size.
>>
>> It seems that the heap grows faster than freed. Thus eventually the Full
>> GC is triggered, taking more than 50s and causing the timeout. However,
>> even the full GC frees only 2GB space out of the 28GB max size. That
>> probably suggests that the max heap size is not sufficient.
>>
>>> 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
>>>  28944M->26018M(28960M), 51.5256128 secs]
>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>> 113556K->112729K(1150976K)]
>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>
>>
>> I would not be so sure about the memory leak. I think it could be a
>> normal pattern that memory keeps growing as more data is processed. E.g.,
>> from the provided log, I see window operation tasks executed in the task
>> manager. Such operation might accumulate data until the window is emitted.
>>
>> Maybe Ori you can also take a look at the task manager log when the job
>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>> I mentioned before, it is possible that, with the same configurations Flink
>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>> changes.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski  wrote:
>>
>>> Thank you very much for your analysis.
>>>
>>> When I said there was no memory leak - I meant that from the specific
>>> TaskManager I monitored in real-time using JProfiler.
>>> Unfortunately, this problem occurs only in 1 of the TaskManager and you
>>> cannot anticipate which. So when you pick a TM to profile at random -
>>> everything looks fine.
>>>
>>> I'm running the job again with Java FlightRecorder now, and I hope I'll
>>> find the reason for the memory leak.
>>>
>>> Thanks!
>>>
>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Thanks, Ori

 From the log, it looks like there IS a memory leak.

 At 10:12:53 there was the last "successfull" gc when 13Gb freed in
 0.4653809 secs:
 [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
 Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]

 Then the heap grew from 10G to 28G with GC not being able to free up
 enough space:
 [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
 12591.0M(28960.0M)->11247.0M(28960.0M)]
 [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
 12103.0M(28960.0M)->11655.0M(28960.0M)]
 [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
 12929.0M(28960.0M)->12467.0M(28960.0M)]
 ... ...
 [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
 28042.6M(28960.0M)->27220.6M(28960.0M)]
 [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
 28494.5M(28960.0M)->28720.6M(28960.0M)]
 [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
 28944.6M(28960.0M)->28944.6M(28960.0M)]

 Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
 heartbeat timed out:
 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
  28944M->26018M(28960M), 51.5256128 secs]
   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
 

Re: DataStream的state问题

2020-07-06 Thread Congxian Qiu
Hi

是最后一次 access 的时间到当前的时间超过了你设置的 ttl 间隔,比如你配置的是 `OnCreateAndWrite`
那么就是创建和写操作之后的 1 天,这个 state 会变成 expired,具体的可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian


ゞ野蠻遊戲χ  于2020年7月7日周二 下午12:17写道:

> Deal all:
>
>
> 想问下,在给state设置ttl的时候,如下面的代码: 
> StateTtlConfig ttlConfig = StateTtlConfig
>  
> .newBuilder(Time.days(1))
>  
>
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>  
> .build();
>
>
>
> 设置了1天时间之后失效,例如2020-07-07 08:30:00点开始的job,那失效时间是这个时间段2020-07-07
> 00:00:00~2020-07-07 23:59:59,还是job上线之后,2020-07-07 08:30:00~2020-07-08
> 08:30:00这个时间段?
>
>
> Thanks
> Jiazhi


DataStream??state????

2020-07-06 Thread ?g???U?[????
Deal all:


??,statettl?? 
 StateTtlConfig ttlConfig = StateTtlConfig
   
   .newBuilder(Time.days(1))
   
   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
   
   
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
   
   .build();



??12020-07-07 
08:30:00job2020-07-07 00:00:00~2020-07-07 
23:59:59??job??2020-07-07 08:30:00~2020-07-08 08:30:00???


Thanks
Jiazhi

FlinkWebUI 参数疑问

2020-07-06 Thread liuhy_em...@163.com
Dear,
请问FlinkUI中TaskManager页面的参数含义是什么,有相关的文章描述吗?




Thanks,
Hongyang


回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-06 Thread seeksst
我看你代码上是sqlUpdate,tableConfig是另外设置的,需要作为入参一同放入sqlUpdate中,
使用方法sqlUpdate(str, config)
另外如果你使用的是rocksdb,需要开启rocksdb的ttl
state.backend.rocksdb.ttl.compaction.filter.enabled设置成true
低版本这个参数默认是false




原始邮件
发件人:x35907...@qq.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年7月7日(周二) 10:46
主题:回复: 求助:FLINKSQL1.10实时统计累计UV


是blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() 
--nbsp;原始邮件nbsp;-- 发件人:nbsp;"Benchao 
Li"libenchao@apache.orggt;; 发送时间:nbsp;2020年7月6日(星期一) 晚上11:11 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV 感觉不太应该有这种情况,你用的是blink planner么? x 35907418@qq.comgt; 
于2020年7月6日周一 下午1:24写道: gt; sorry,我说错了,确实没有,都是group agg. gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
 gt; gt; gt; --amp;nbsp;原始邮件amp;nbsp;-- gt; 
发件人:amp;nbsp;"Benchao Li"libenchao@apache.orgamp;gt;; gt; 
发送时间:amp;nbsp;2020年7月6日(星期一) 中午12:52 gt; 
收件人:amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;gt;; gt; gt; 主题:amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; gt; gt; gt; 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 gt; 
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 gt; gt; [1] gt; gt; 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT 

Re: 关于jdbc connector扩展问题

2020-07-06 Thread Jark Wu
Hi,

目前 flink-connector-jdbc 还不支持注册
dialect,社区有这方面的计划,但是目前还没有资源做这一块,这是个比较复杂的功能,需要对接口做细致的设计。
目前你可以拿 flink-connector-jdbc 源码,加一个自己的 Dialect 类,在 JdbcDialects 中注册进你的
dialect,然后编译打包就可以了。

Best,
Jark

On Mon, 6 Jul 2020 at 20:10, claylin <1012539...@qq.com> wrote:

> hi all我这里有个需求需要从sql里面写数据到clickhouse里面,但是看源码,发现并不好扩展,
> https://github.com/apache/flink/blob/d04872d2c6b7570ea3ba02f8fc4fca02daa96118/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java#L30,
> 这里我看直接写死仅支持DerbyDialect、MySQLDialect、PostgresDialect,而且这个类不支持注册jdbc新驱动,如果想在SQL里面支持其他类型的数据库的话,该怎么弄,求支招


?????? ??????FLINKSQL1.10????????????UV

2020-07-06 Thread x
??blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()





----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

 x <35907...@qq.comgt; ??2020??7??6?? 11:15??

 gt; ??1.10.1??sinkwindow??count
 gt; distinct??window??count
 gt;
 
distinct??windowgroupamp;nbsp;DATE_FORMAT(rowtm,
 gt; '-MM-dd') sql??
 gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
 gt;nbsp;nbsp; """
 gt;nbsp;nbsp;nbsp;nbsp; SELECT 
MAX(DATE_FORMAT(rowtm, '-MM-dd
 HH:mm:00'))
 gt; time_str,COUNT(DISTINCT userkey) uv
 gt;nbsp;nbsp;nbsp;nbsp; FROM source
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY 
DATE_FORMAT(rowtm, '-MM-dd')
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
 gt;
 gt; val totaluvTmp =
 tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 gt;nbsp;nbsp; .filter( line =amp;gt; line._1 == true 
).map( line
 =amp;gt; line._2 )
 gt;
 gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
 gt;
 gt; tabEnv.sqlUpdate(
 gt;nbsp;nbsp; s"""
 gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO mysql_totaluv
 gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2)
 gt;nbsp;nbsp;nbsp;nbsp; FROM $totaluvTabTmp
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Benchao 
Li"https://issues.apache.org/jira/browse/FLINK-17942
 gt;
 gt; x <35907...@qq.comamp;gt; ??2020??7??3?? 4:34??
 gt;
 gt; amp;gt; 
checkpoint??
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 
--amp;amp;nbsp;amp;amp;nbsp;--
 gt; amp;gt; ??:amp;amp;nbsp;"Jark 
Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; x 
<35907...@qq.com
 amp;amp;amp;amp;gt;
 gt; ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CREATE
 VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 SELECTamp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; 
MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;amp;nbsp;,
 gt; amp;gt; amp;amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 HH:mm:00'))amp;amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; AS
 gt; time_str,amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; COUNT(DISTINCT user_id) OVER
 gt; amp;gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; FROM
 user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; WINDOW w
 AS (ORDER BY proctime
 gt; ROWS BETWEEN
 gt; amp;gt; UNBOUNDED
 gt; amp;gt; amp;amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CURRENT
 ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; PARTITION
 BY
 gt; DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 PS??1.10??DDL??CREATE
 gt; VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; 
 gt;
 gt;
 gt;
 gt; --
 gt;
 gt; Best,
 gt; Benchao Li



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

Re:Re: 如何在窗口关闭的时候清除状态

2020-07-06 Thread flink小猪



[1].设置TTL应该也能达到相同的效果,我还是希望在窗口关闭的时候能够做一些自定义的操作(比如这里的清除状态,也许之后会有其他的操作TTL就不一样好用了)
[2].KeyedProcessFunction,应该自己注册定时器把,在我的代码里面是timeWIndow().trigger().process(), 
ProcessWindowFunction方法我只需要处理逻辑即可,不需要管定时的窗口。














在 2020-07-05 11:56:03,"Congxian Qiu"  写道:
>看上去这个需求是 一天的窗口,每个小时都 trigger 一次,希望 state 在 1 天之后进行清理。
>你可以尝试一下 TTL[1] State
>另外想问一下,你自己写 ProcessWindowFunction 的话,为什么不考虑 KeyedProcessFunction[2] 呢
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
>Best,
>Congxian
>
>
>JasonLee <17610775...@163.com> 于2020年7月4日周六 下午8:29写道:
>
>> 设置一下状态过期的时间呢
>>
>>
>> | |
>> JasonLee
>> |
>> |
>> 邮箱:17610775...@163.com
>> |
>>
>> Signature is customized by Netease Mail Master
>>
>> 在2020年07月03日 14:02,18579099...@163.com 写道:
>>
>> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢?
>>
>> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。
>>
>> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢?
>>
>>
>>
>> 18579099...@163.com
>>


Re: Flink从SavePoint启动任务,修改的代码不生效

2020-07-06 Thread Paul Lam
估计你是用同一个 Kafka Source 消费 A B 两个 Topic? 如果是,看起来像是 Kafka Connector 早期的一个问题。

作业停止的时候,Topic B 的 partition offset 被存储到 Savepoint 中,然后在恢复的时候,尽管代码中 Topic B 
已经被移除,但它的 partition offset 还是被恢复了。

这个问题在后来的版本,估计是 1.8 或 1.9,被修复了。

Best,
Paul Lam

> 2020年7月6日 20:55,milan183sansiro  写道:
> 
> 你好:
>1.没有给算子手动设置id
>2.设置savepoint恢复的路径是正确的
> 
> 
> 在2020年7月6日 20:32,wujunxi<462329...@qq.com> 写道:
> 你好,确认以下两个点
> 1.是否给每个算子设置了id
> 2.设置savepoint恢复的路径是否正确
> 
> 
> 
> --原始邮件--
> 发件人:"milan183sansiro" 发送时间:2020年7月6日(星期一) 晚上7:55
> 收件人:"user-zh" 
> 主题:Flink从SavePoint启动任务,修改的代码不生效
> 
> 
> 
> 各位好:
>  
> 背景:Flink版本1.6.4,数据源为Kafka的主题A,B,消费者组相同
>  操作步骤:1.使用SavePoint取消任务。
>  2.修改代码将B去掉,只消费A主题。
>  
> 3.从SavePoint启动任务,发现消费者组在B主题下的偏移量也回到了任务停止时的偏移量,之后偏移量马上变成了最新并继续消费。
>  想知道为什么修改代码不生效。



flink 1.10.1 入 hive 格式为parquet

2020-07-06 Thread lydata
 Hi,

可以提供一份flink1.10 入hive格式为parquet的例子吗?

Best,
lydata

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
Thanks for the reply Chen.

My use case is a "simple" get from Kafka into S3. The job can read very
quickly from Kafka and S3 is having some issues keeping up. The
backpressure don't have enough time to actuate in this case, and when it
reaches the checkpoint time some errors like heartbeat timeout or task
manager didn't reply back starts to happen.

I will investigate further and try this example.

On Mon, Jul 6, 2020 at 5:45 PM Chen Qin  wrote:

> My two cents here,
>
> - flink job already has back pressure so rate limit can be done via
> setting parallelism to proper number in some use cases. There is an open
> issue of checkpointing reliability when back pressure, community seems
> working on it.
>
> - rate limit can be abused easily and cause lot of confusions. Think about
> a use case where you have two streams do a simple interval join. Unless you
> were able to rate limit both with proper value dynamiclly, you might see
> timestamp and watermark gaps keep increasing causing checkpointing failure.
>
> So the question might be, instead of looking at rate limit of one source,
> how to slow down all sources without ever increasing time, wm gaps. It
> sounds complicated already.
>
> with what being said, if you really want to have rate limit on your own,
> you can try following code :) It works well for us.
>
> public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {
>
>   protected static final Logger LOG = 
> LoggerFactory.getLogger(SynchronousKafkaConsumer.class);
>
>   private final double topicRateLimit;
>   private transient RateLimiter subtaskRateLimiter;
>
>
> @Override
> public void open(Configuration configuration) throws Exception {
>   Preconditions.checkArgument(
>   topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 
> 0.1,
>   "subtask ratelimit should be greater than 0.1 QPS");
>   subtaskRateLimiter = RateLimiter.create(
>   topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
>   super.open(configuration);
> }
>
>
> @Override
> protected AbstractFetcher createFetcher(
> SourceContext sourceContext,
> Map partitionsWithOffsets,
> SerializedValue> watermarksPeriodic,
> SerializedValue> watermarksPunctuated,
> StreamingRuntimeContext runtimeContext,
> OffsetCommitMode offsetCommitMode,
> MetricGroup consumerMetricGroup, boolean useMetrics)
> throws Exception {
>
>   return new KafkaFetcher(
>   sourceContext,
>   partitionsWithOffsets,
>   watermarksPeriodic,
>   watermarksPunctuated,
>   runtimeContext.getProcessingTimeService(),
>   runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
>   runtimeContext.getUserCodeClassLoader(),
>   runtimeContext.getTaskNameWithSubtasks(),
>   deserializer,
>   properties,
>   pollTimeout,
>   runtimeContext.getMetricGroup(),
>   consumerMetricGroup,
>   useMetrics) {
> @Override
> protected void emitRecord(T record,
>   KafkaTopicPartitionState 
> partitionState,
>   long offset) throws Exception {
>   subtaskRateLimiter.acquire();
>   if (record == null) {
> consumerMetricGroup.counter("invalidRecord").inc();
>   }
>   super.emitRecord(record, partitionState, offset);
> }
>
> @Override
> protected void emitRecordWithTimestamp(T record,
> 
> KafkaTopicPartitionState partitionState,
>long offset, long timestamp) 
> throws Exception {
>   subtaskRateLimiter.acquire();
>   if (record == null) {
> consumerMetricGroup.counter("invalidRecord").inc();
>   }
>   super.emitRecordWithTimestamp(record, partitionState, offset, 
> timestamp);
> }
>   };
>
> }
>
> Thanks,
>
> Chen
> Pinterest Data
>
>
> On Jul 6, 2020, at 7:43 AM, David Magalhães  wrote:
>
> I've noticed that this FLINK-11501 was implemented in
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the
> flink-connector-kafka. There is any reason for this, and why should be the
> best solution to implement a rate limit functionality in the current Kafka
> consumer?
>
> Thanks,
> David
>
> [1]
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>
> [2]
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>
>
>


Decompressing Tar Files for Batch Processing

2020-07-06 Thread Austin Cawley-Edwards
Hey all,

I need to ingest a tar file containing ~1GB of data in around 10 CSVs. The
data is fairly connected and needs some cleaning, which I'd like to do with
the Batch Table API + SQL (but have never used before). I've got a small
prototype loading the uncompressed CSVs and applying the necessary SQL,
which works well.

I'm wondering about the task of downloading the tar file and unzipping it
into the CSVs. Does this sound like something I can/ should do in Flink, or
should I set up another process to download, unzip, and store in a
filesystem to then read with the Flink Batch job? My research is leading me
towards doing it separately but I'd like to do it all in the same job if
there's a creative way.

Thanks!
Austin


SSL for QueryableStateClient

2020-07-06 Thread mail2so...@yahoo.co.in
Hello,
I am running flink on Kubernetes, and from outside the Ingress to a proxy on 
Kubernetes is via SSL 443 PORT only.
Can you please provide guidance on how to setup the SSL for 
QueryableStateClient, the client to inquire the state. 

Please let me know if any other details is needed.
Thanks & RegardsSouma Suvra Ghosh

Re: Asynchronous I/O poor performance

2020-07-06 Thread Arvid Heise
Hi Mark,

Async wait operators cannot be chained to sources so the messages go
through the network stack. Thus, having some latency is normal and cannot
be avoided. It can be tuned though, but I don't think that this is the
issue at hand as it should mostly impact latency and affect throughput
less. Since external I/O calls are much more heavy weight than our internal
communication, both the drop of throughput and the increase in latency are
usually dwarfed by the external I/O call costs.

Please try to increase the thread pool for akka as written in my previous
email and report back.

On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik  wrote:

> Hi Benchao,
>
> i have run this in the code:
>
> println(env.getConfig.getAutoWatermarkInterval)
>
> and got 200 i do fully understand how watermarks and AsyncOperator
> operator works, but
> i have decided to make a simple test that should evaluate the time it
> takes to enter to the asyncInvoke method  and it looks that it takes about
> 80ms witch is longer than the time it take to get a response from my
> micro-service
>
> code below
>
> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, 
> String)] {
>
>   implicit lazy val executor: ExecutionContext = 
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>   /*
>   implicit val actorSystem = ActorSystem.apply("test", None, None, 
> Some(executor))
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext = actorSystem.dispatcher
>
>
>   println(materializer.system.name)
>   println("start")
>   */
> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>
>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>   var actorSystem: ActorSystem = null
>   var materializer: ActorMaterializer = null
>   var executionContext: ExecutionContextExecutor = null
>   //var akkaHttp: HttpExt = null
>
>   override def open(parameters: Configuration): Unit = {
> actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, 
> Some(ConfigFactory.load("application.conf")), None, Some(executor))
> materializer = ActorMaterializer()(actorSystem)
> executionContext = actorSystem.dispatcher
> //akkaHttp = Http(actorSystem)
>   }
>
>   override def close(): Unit = {
> actorSystem.terminate()
>   }
>
>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
> String)]): Unit = {
> val start = str.toLong
> val delta = System.currentTimeMillis() - start
> resultFuture.complete(Iterable((str, s"${delta}")))
>   }
> }
>
>
> object Job {
>   def main(args: Array[String]): Unit = {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> //env.enableCheckpointing(10)
> env.setParallelism(1)
>
> val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
> //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => 
> System.currentTimeMillis()-s}.print()
> val x : DataStream[String] = someIntegers.map( _ => 
> s"${System.currentTimeMillis()}")
> val resultStream: DataStream[(String, String)] = 
> AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, 
> TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>   //AsyncDataStream.unorderedWait(data , new 
> AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
> resultStream.print()
> println(env.getConfig.getAutoWatermarkInterval)
> env.execute("Flink Scala API Skeleton")
>   }
> }
>
> is this normal behavior?
>
>
> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:
>
>> Hi Mark,
>>
>> According to your data, I think the config of AsyncOperator is OK.
>> There is one more config that might affect the throughput of
>> AsyncOperator, it's watermark.
>> Because unordered async operator still keeps the order between
>> watermarks, did you use
>> event time in your job, and if yes, what's the watermark interval in your
>> job?
>>
>> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>>
>>> Hi Benchao
>>>
>>> The capacity is 100
>>> Parallelism is 8
>>> Rpc req is 20ms
>>>
>>> Thanks
>>>
>>>
>>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>>
 Hi Mark,

 Could you give more details about your Flink job?
 - the capacity of AsyncDataStream
 - the parallelism of AsyncDataStream operator
 - the time of per blocked rpc request

 Mark Zitnik  于2020年7月5日周日 上午3:48写道:

> Hi
>
> In my flink application I need to enrich data using 
> AsyncDataStream.unorderedWait
> but I am getting poor perforce at the beginning I was just working
> with http call, but I have switched to grpc, I running on 8 core node and
> getting total of 3200 events per second my service that I am using is not
> fully utilized and can produce up to 1 req/seq
>
> Flink job flow
> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~>
> write to Kafka

Re: Flink AskTimeoutException killing the jobs

2020-07-06 Thread M Singh
 Thanks Xintong.  I will check the logs.  
On Sunday, July 5, 2020, 09:29:31 PM EDT, Xintong Song 
 wrote:  
 
 As I already mentioned,

I would suggest to look into the jobmanager logs and gc logs, see if there's 
any problem that prevent the process from handling the rpc messages timely.


The Akka ask timeout does not seem to be the root problem to me.

Thank you~

Xintong Song




On Sat, Jul 4, 2020 at 12:12 AM M Singh  wrote:

 Hi Xintong/LakeShen:
We have the following setting in flink-conf.yaml

akka.ask.timeout: 180 s

akka.tcp.timeout: 180 s


But still see this exception.  Are there multiple akka.ask.timeout or 
additional settings required ?

Thanks
Mans
On Friday, July 3, 2020, 01:08:05 AM EDT, Xintong Song 
 wrote:  
 
 The configuration option you're looking for is `akka.ask.timeout`.




However, I'm not sure increasing this configuration would help in your case. 
The error message shows that there is a timeout on a local message. It is wired 
a local message does not get replied within 10 sec. I would suggest to look 
into the jobmanager logs and gc logs, see if there's any problem that prevent 
the process from handling the rpc messages timely.




Thank you~

Xintong Song




On Fri, Jul 3, 2020 at 3:51 AM M Singh  wrote:

Hi:
I am using Flink 1.10 on AWS EMR cluster.
We are getting AskTimeoutExceptions which is causing the flink jobs to die.   
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms]. 
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
typical reason for `AskTimeoutException` is that the recipient actor didn't 
send a reply.at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 more

Can you please let me know where can I set the timeout for this timeout ? 
I could not find this specific timeout in the flink doc - Apache Flink 1.10 
Documentation: Configuration.

Thanks
Mans
  
  

Re: Asynchronous I/O poor performance

2020-07-06 Thread Mark Zitnik
Hi Benchao,

i have run this in the code:

println(env.getConfig.getAutoWatermarkInterval)

and got 200 i do fully understand how watermarks and AsyncOperator operator
works, but
i have decided to make a simple test that should evaluate the time it takes
to enter to the asyncInvoke method  and it looks that it takes about 80ms
witch is longer than the time it take to get a response from my
micro-service

code below

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {

  implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())

  /*
  implicit val actorSystem = ActorSystem.apply("test", None, None,
Some(executor))
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher


  println(materializer.system.name)
  println("start")
  */
// redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com

  // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
  var actorSystem: ActorSystem = null
  var materializer: ActorMaterializer = null
  var executionContext: ExecutionContextExecutor = null
  //var akkaHttp: HttpExt = null

  override def open(parameters: Configuration): Unit = {
actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString,
Some(ConfigFactory.load("application.conf")), None, Some(executor))
materializer = ActorMaterializer()(actorSystem)
executionContext = actorSystem.dispatcher
//akkaHttp = Http(actorSystem)
  }

  override def close(): Unit = {
actorSystem.terminate()
  }

  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
val start = str.toLong
val delta = System.currentTimeMillis() - start
resultFuture.complete(Iterable((str, s"${delta}")))
  }
}


object Job {
  def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//env.enableCheckpointing(10)
env.setParallelism(1)

val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
//someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
  //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
resultStream.print()
println(env.getConfig.getAutoWatermarkInterval)
env.execute("Flink Scala API Skeleton")
  }
}

is this normal behavior?


On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>>
 Hi

 In my flink application I need to enrich data using 
 AsyncDataStream.unorderedWait
 but I am getting poor perforce at the beginning I was just working with
 http call, but I have switched to grpc, I running on 8 core node and
 getting total of 3200 events per second my service that I am using is not
 fully utilized and can produce up to 1 req/seq

 Flink job flow
 Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
 to Kafka

 Using Akkad grpc code written in scala

 Thanks

>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink Parallelism for various type of transformation

2020-07-06 Thread Arvid Heise
Hi Prasanna,

overcommitting cores was actually a recommended technique a while ago to
counter-balance I/O. So it's not bad per se.

However, with slot sharing each core is already doing the work for source,
transform, sink, so it's not necessary. So I'd go with slots = cores and I
rather strongly suggest to switch to async I/O to perform the external
transformation. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Mon, Jul 6, 2020 at 7:01 PM Prasanna kumar 
wrote:

> Hi ,
>
> I used t2.medium machines for the task manager nodes. It has 2 CPU and 4GB
> memory.
>
> But the task manager screen shows that there are 4 slots.
>
> Generally we should match the number of slots to the number of cores.
>
> [image: image.png]
>
> Our pipeline is Source -> Simple Transform -> Sink.
>
> What happens when we have more slots than cores in following scenarios?
> 1) The transform is just changing of json format.
>
> 2)  When the transformation is done by hitting another server (HTTP
> request)
>
> Thanks,
> Prasanna.
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Asynchronous I/O poor performance

2020-07-06 Thread Arvid Heise
Hi Mark,

could you please check if you can tune akka? Usually in async I/O, the used
library uses a thread pool that becomes the actual bottleneck.

If you configure async I/O to use a capacity of 100 and parallelism of 8 on
one node, you also need to have ~800 threads in akka (500 might be enough
because of overhead) or else async I/O gets blocked while waiting for akka
threads to become available.

Best,

Arvid

On Mon, Jul 6, 2020 at 1:45 PM Benchao Li  wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>>
 Hi

 In my flink application I need to enrich data using 
 AsyncDataStream.unorderedWait
 but I am getting poor perforce at the beginning I was just working with
 http call, but I have switched to grpc, I running on 8 core node and
 getting total of 3200 events per second my service that I am using is not
 fully utilized and can produce up to 1 req/seq

 Flink job flow
 Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
 to Kafka

 Using Akkad grpc code written in scala

 Thanks

>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Logging Flink metrics

2020-07-06 Thread Manish G
Ok, got it.
I would try to do it manually.

Thanks a lot for your inputs and efforts.

With regards

On Mon, Jul 6, 2020 at 10:58 PM Chesnay Schepler  wrote:

> WSL is a bit buggy when it comes to allocating ports; it happily lets 2
> processes create sockets on the same port, except that the latter one
> doesn't do anything.
> Super annying, and I haven't found a solution to that myself yet.
>
> You'll have to configure the ports explicitly for the JM/TM, which will
> likely entail manually starting the processes and updating the
> configuration in-between, e.g.:
>
> ./bin/jobmanager.sh start
> 
> ./bin/taskmanager.sh start
>
> On 06/07/2020 19:16, Manish G wrote:
>
> Yes.
>
> On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler 
> wrote:
>
>> Are you running Flink is WSL by chance?
>>
>> On 06/07/2020 19:06, Manish G wrote:
>>
>> In flink-conf.yaml:
>> *metrics.reporter.prom.port: 9250-9260*
>>
>> This is based on information provided here
>> 
>> *port - (optional) the port the Prometheus exporter listens on, defaults
>> to 9249
>> .
>> In order to be able to run several instances of the reporter on one host
>> (e.g. when one TaskManager is colocated with the JobManager) it is
>> advisable to use a port range like 9250-9260.*
>>
>> As I am running flink locally, so both jobmanager and taskmanager are
>> colocated.
>>
>> In prometheus.yml:
>>
>>
>>
>>
>> *- job_name: 'flinkprometheus' scrape_interval: 5s
>> static_configs:   - targets: ['localhost:9250', 'localhost:9251']
>> metrics_path: /*
>>
>> This is the whole configuration I have done based on several tutorials
>> and blogs available online.
>>
>>
>>
>>
>> On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler 
>> wrote:
>>
>>> These are all JobManager metrics; have you configured prometheus to also
>>> scrape the task manager processes?
>>>
>>> On 06/07/2020 18:35, Manish G wrote:
>>>
>>> The metrics I see on prometheus is like:
>>>
>>> # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
>>> lastCheckpointRestoreTimestamp (scope: jobmanager_job)
>>> # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
>>> flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>>  -1.0
>>> # HELP flink_jobmanager_job_numberOfFailedCheckpoints 
>>> numberOfFailedCheckpoints (scope: jobmanager_job)
>>> # TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
>>> flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>>  0.0
>>> # HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
>>> jobmanager_Status_JVM_Memory_Heap)
>>> # TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
>>> flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
>>> # HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
>>> Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
>>> # TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
>>> flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
>>>  2.0
>>> # HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
>>> jobmanager_Status_JVM_CPU)
>>> # TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
>>> flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
>>> # HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
>>> TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
>>> # TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
>>> flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
>>> 604064.0
>>> # HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
>>> jobmanager_job)
>>> # TYPE flink_jobmanager_job_fullRestarts gauge
>>> flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>>  0.0
>>>
>>>
>>>
>>>
>>> On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler 
>>> wrote:
>>>
 You've said elsewhere that you do see some metrics in prometheus, which
 are those?

 Why are you configuring the host for the prometheus reporter? This
 option is only for the PrometheusPushGatewayReporter.

 On 06/07/2020 18:01, Manish G wrote:

 Hi,

 So I have following in flink-conf.yml :
 //
 metrics.reporter.prom.class:
 org.apache.flink.metrics.prometheus.PrometheusReporter
 metrics.reporter.prom.host: 127.0.0.1
 metrics.reporter.prom.port: 
 metrics.reporter.slf4j.class:
 org.apache.flink.metrics.slf4j.Slf4jReporter
 metrics.reporter.slf4j.interval: 30 SECONDS
 

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
WSL is a bit buggy when it comes to allocating ports; it happily lets 2 
processes create sockets on the same port, except that the latter one 
doesn't do anything.

Super annying, and I haven't found a solution to that myself yet.

You'll have to configure the ports explicitly for the JM/TM, which will 
likely entail manually starting the processes and updating the 
configuration in-between, e.g.:


./bin/jobmanager.sh start

./bin/taskmanager.sh start

On 06/07/2020 19:16, Manish G wrote:

Yes.

On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler > wrote:


Are you running Flink is WSL by chance?

On 06/07/2020 19:06, Manish G wrote:

In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here


/*|port|- (optional) the port the Prometheus exporter listens on,
defaults to9249
.
In order to be able to run several instances of the reporter on
one host (e.g. when one TaskManager is colocated with the
JobManager) it is advisable to use a port range like|9250-9260|.*/
/*
*/
As I am running flink locally, so both jobmanager and taskmanager
are colocated.

In prometheus.yml:
*- job_name: 'flinkprometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9250', 'localhost:9251']
    metrics_path: /*
*
*
This is the whole configuration I have done based on several
tutorials and blogs available online.
**


/**/


On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

These are all JobManager metrics; have you configured
prometheus to also scrape the task manager processes?

On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge

flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints 
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge

flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 
1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
gauge

flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge

flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge

flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You've said elsewhere that you do see some metrics in
prometheus, which are those?

Why are you configuring the host for the prometheus
reporter? This option is only for the
PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
  

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Yes.

On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler  wrote:

> Are you running Flink is WSL by chance?
>
> On 06/07/2020 19:06, Manish G wrote:
>
> In flink-conf.yaml:
> *metrics.reporter.prom.port: 9250-9260*
>
> This is based on information provided here
> 
> *port - (optional) the port the Prometheus exporter listens on, defaults
> to 9249
> .
> In order to be able to run several instances of the reporter on one host
> (e.g. when one TaskManager is colocated with the JobManager) it is
> advisable to use a port range like 9250-9260.*
>
> As I am running flink locally, so both jobmanager and taskmanager are
> colocated.
>
> In prometheus.yml:
>
>
>
>
> *- job_name: 'flinkprometheus' scrape_interval: 5s static_configs:
>   - targets: ['localhost:9250', 'localhost:9251'] metrics_path: /*
>
> This is the whole configuration I have done based on several tutorials and
> blogs available online.
>
>
>
>
> On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler 
> wrote:
>
>> These are all JobManager metrics; have you configured prometheus to also
>> scrape the task manager processes?
>>
>> On 06/07/2020 18:35, Manish G wrote:
>>
>> The metrics I see on prometheus is like:
>>
>> # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
>> lastCheckpointRestoreTimestamp (scope: jobmanager_job)
>> # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
>> flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>  -1.0
>> # HELP flink_jobmanager_job_numberOfFailedCheckpoints 
>> numberOfFailedCheckpoints (scope: jobmanager_job)
>> # TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
>> flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>  0.0
>> # HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
>> jobmanager_Status_JVM_Memory_Heap)
>> # TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
>> flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
>> # HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
>> (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
>> # TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
>> flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
>>  2.0
>> # HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
>> jobmanager_Status_JVM_CPU)
>> # TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
>> flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
>> # HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
>> (scope: jobmanager_Status_JVM_Memory_Direct)
>> # TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
>> flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
>> 604064.0
>> # HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
>> # TYPE flink_jobmanager_job_fullRestarts gauge
>> flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>  0.0
>>
>>
>>
>>
>> On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler 
>> wrote:
>>
>>> You've said elsewhere that you do see some metrics in prometheus, which
>>> are those?
>>>
>>> Why are you configuring the host for the prometheus reporter? This
>>> option is only for the PrometheusPushGatewayReporter.
>>>
>>> On 06/07/2020 18:01, Manish G wrote:
>>>
>>> Hi,
>>>
>>> So I have following in flink-conf.yml :
>>> //
>>> metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>> metrics.reporter.prom.host: 127.0.0.1
>>> metrics.reporter.prom.port: 
>>> metrics.reporter.slf4j.class:
>>> org.apache.flink.metrics.slf4j.Slf4jReporter
>>> metrics.reporter.slf4j.interval: 30 SECONDS
>>> //
>>>
>>> And while I can see custom metrics in Taskmanager logs, but prometheus
>>> dashboard logs doesn't show custom metrics.
>>>
>>> With regards
>>>
>>> On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler 
>>> wrote:
>>>
 You have explicitly configured a reporter list, resulting in the slf4j
 reporter being ignored:

 2020-07-06 13:48:22,191 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
 configuration property: metrics.reporters, prom
 2020-07-06 13:48:23,203 INFO
 org.apache.flink.runtime.metrics.ReporterSetup- Excluding
 reporter slf4j, not configured in reporter list (prom).

 Note that nowadays metrics.reporters is no longer required; the set of
 reporters 

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

Are you running Flink is WSL by chance?

On 06/07/2020 19:06, Manish G wrote:

In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here 

/*|port|- (optional) the port the Prometheus exporter listens on, 
defaults to9249 
. 
In order to be able to run several instances of the reporter on one 
host (e.g. when one TaskManager is colocated with the JobManager) it 
is advisable to use a port range like|9250-9260|.*/

/*
*/
As I am running flink locally, so both jobmanager and taskmanager are 
colocated.


In prometheus.yml:
*- job_name: 'flinkprometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9250', 'localhost:9251']
    metrics_path: /*
*
*
This is the whole configuration I have done based on several tutorials 
and blogs available online.

**


/**/


On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler > wrote:


These are all JobManager metrics; have you configured prometheus
to also scrape the task manager processes?

On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge

flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints 
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge

flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge

flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge

flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You've said elsewhere that you do see some metrics in
prometheus, which are those?

Why are you configuring the host for the prometheus reporter?
This option is only for the PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but
prometheus dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You have explicitly configured a reporter list,
resulting in the slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration -
Loading configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup -
Excluding reporter slf4j, not configured in reporter
list (prom).

Note that 

Re: Logging Flink metrics

2020-07-06 Thread Manish G
In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here

*port - (optional) the port the Prometheus exporter listens on, defaults
to 9249
.
In order to be able to run several instances of the reporter on one host
(e.g. when one TaskManager is colocated with the JobManager) it is
advisable to use a port range like 9250-9260.*

As I am running flink locally, so both jobmanager and taskmanager are
colocated.

In prometheus.yml:




*- job_name: 'flinkprometheus'scrape_interval: 5sstatic_configs:
  - targets: ['localhost:9250', 'localhost:9251']metrics_path: /*

This is the whole configuration I have done based on several tutorials and
blogs available online.




On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler  wrote:

> These are all JobManager metrics; have you configured prometheus to also
> scrape the task manager processes?
>
> On 06/07/2020 18:35, Manish G wrote:
>
> The metrics I see on prometheus is like:
>
> # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
> lastCheckpointRestoreTimestamp (scope: jobmanager_job)
> # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
> flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>  -1.0
> # HELP flink_jobmanager_job_numberOfFailedCheckpoints 
> numberOfFailedCheckpoints (scope: jobmanager_job)
> # TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
> flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>  0.0
> # HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
> jobmanager_Status_JVM_Memory_Heap)
> # TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
> flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
> # HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
> (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
> # TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
> flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
>  2.0
> # HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
> jobmanager_Status_JVM_CPU)
> # TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
> flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
> # HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
> (scope: jobmanager_Status_JVM_Memory_Direct)
> # TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
> flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
> 604064.0
> # HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
> # TYPE flink_jobmanager_job_fullRestarts gauge
> flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>  0.0
>
>
>
>
> On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler 
> wrote:
>
>> You've said elsewhere that you do see some metrics in prometheus, which
>> are those?
>>
>> Why are you configuring the host for the prometheus reporter? This
>> option is only for the PrometheusPushGatewayReporter.
>>
>> On 06/07/2020 18:01, Manish G wrote:
>>
>> Hi,
>>
>> So I have following in flink-conf.yml :
>> //
>> metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> metrics.reporter.prom.host: 127.0.0.1
>> metrics.reporter.prom.port: 
>> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
>> metrics.reporter.slf4j.interval: 30 SECONDS
>> //
>>
>> And while I can see custom metrics in Taskmanager logs, but prometheus
>> dashboard logs doesn't show custom metrics.
>>
>> With regards
>>
>> On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler 
>> wrote:
>>
>>> You have explicitly configured a reporter list, resulting in the slf4j
>>> reporter being ignored:
>>>
>>> 2020-07-06 13:48:22,191 INFO
>>> org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporters, prom
>>> 2020-07-06 13:48:23,203 INFO
>>> org.apache.flink.runtime.metrics.ReporterSetup- Excluding
>>> reporter slf4j, not configured in reporter list (prom).
>>>
>>> Note that nowadays metrics.reporters is no longer required; the set of
>>> reporters is automatically determined based on configured properties; the
>>> only use-case is disabling a reporter without having to remove the entire
>>> configuration.
>>> I'd suggest to just remove the option, try again, and report back.
>>>
>>> On 06/07/2020 16:35, Chesnay Schepler wrote:
>>>
>>> 

Flink Parallelism for various type of transformation

2020-07-06 Thread Prasanna kumar
Hi ,

I used t2.medium machines for the task manager nodes. It has 2 CPU and 4GB
memory.

But the task manager screen shows that there are 4 slots.

Generally we should match the number of slots to the number of cores.

[image: image.png]

Our pipeline is Source -> Simple Transform -> Sink.

What happens when we have more slots than cores in following scenarios?
1) The transform is just changing of json format.

2)  When the transformation is done by hitting another server (HTTP
request)

Thanks,
Prasanna.


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
These are all JobManager metrics; have you configured prometheus to also 
scrape the task manager processes?


On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints numberOfFailedCheckpoints 
(scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
(scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
(scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge
flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler > wrote:


You've said elsewhere that you do see some metrics in prometheus,
which are those?

Why are you configuring the host for the prometheus reporter? This
option is only for the PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but
prometheus dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You have explicitly configured a reporter list, resulting in
the slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup - Excluding
reporter slf4j, not configured in reporter list (prom).

Note that nowadays metrics.reporters is no longer required;
the set of reporters is automatically determined based on
configured properties; the only use-case is disabling a
reporter without having to remove the entire configuration.
I'd suggest to just remove the option, try again, and report
back.

On 06/07/2020 16:35, Chesnay Schepler wrote:

Please enable debug logging and search for warnings from the
metric groups/registry/reporter.

If you cannot find anything suspicious, you can also send
the foll log to me directly.

On 06/07/2020 16:29, Manish G wrote:

Job is an infinite streaming one, so it keeps going. Flink
configuration is as:

metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How long did the job run for, and what is the
configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the
link(changes in flink-conf.yml, copying the jar in lib
directory), and registered the Meter with metrics
group and invoked 

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread Chen Qin
My two cents here,

- flink job already has back pressure so rate limit can be done via setting 
parallelism to proper number in some use cases. There is an open issue of 
checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a 
use case where you have two streams do a simple interval join. Unless you were 
able to rate limit both with proper value dynamiclly, you might see timestamp 
and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how 
to slow down all sources without ever increasing time, wm gaps. It sounds 
complicated already.

with what being said, if you really want to have rate limit on your own, you 
can try following code :) It works well for us.
public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {

  protected static final Logger LOG = 
LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

  private final double topicRateLimit;
  private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
  Preconditions.checkArgument(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
  "subtask ratelimit should be greater than 0.1 QPS");
  subtaskRateLimiter = RateLimiter.create(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
  super.open(configuration);
}

@Override
protected AbstractFetcher createFetcher(
SourceContext sourceContext,
Map partitionsWithOffsets,
SerializedValue> watermarksPeriodic,
SerializedValue> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

  return new KafkaFetcher(
  sourceContext,
  partitionsWithOffsets,
  watermarksPeriodic,
  watermarksPunctuated,
  runtimeContext.getProcessingTimeService(),
  runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
  runtimeContext.getUserCodeClassLoader(),
  runtimeContext.getTaskNameWithSubtasks(),
  deserializer,
  properties,
  pollTimeout,
  runtimeContext.getMetricGroup(),
  consumerMetricGroup,
  useMetrics) {
@Override
protected void emitRecord(T record,
  KafkaTopicPartitionState 
partitionState,
  long offset) throws Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,

KafkaTopicPartitionState partitionState,
   long offset, long timestamp) throws 
Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
  };

}
Thanks,

Chen
Pinterest Data


> On Jul 6, 2020, at 7:43 AM, David Magalhães  wrote:
> 
> I've noticed that this FLINK-11501 was implemented in 
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the 
> flink-connector-kafka. There is any reason for this, and why should be the 
> best solution to implement a rate limit functionality in the current Kafka 
> consumer?
> 
> Thanks,
> David
> 
> [1] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>  
> 
> 
> [2] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>  
> 


Re: Logging Flink metrics

2020-07-06 Thread Manish G
The metrics I see on prometheus is like:

# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
-1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope:
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope:
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",}
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge
flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler  wrote:

> You've said elsewhere that you do see some metrics in prometheus, which
> are those?
>
> Why are you configuring the host for the prometheus reporter? This option
> is only for the PrometheusPushGatewayReporter.
>
> On 06/07/2020 18:01, Manish G wrote:
>
> Hi,
>
> So I have following in flink-conf.yml :
> //
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.host: 127.0.0.1
> metrics.reporter.prom.port: 
> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
> metrics.reporter.slf4j.interval: 30 SECONDS
> //
>
> And while I can see custom metrics in Taskmanager logs, but prometheus
> dashboard logs doesn't show custom metrics.
>
> With regards
>
> On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler 
> wrote:
>
>> You have explicitly configured a reporter list, resulting in the slf4j
>> reporter being ignored:
>>
>> 2020-07-06 13:48:22,191 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: metrics.reporters, prom
>> 2020-07-06 13:48:23,203 INFO
>> org.apache.flink.runtime.metrics.ReporterSetup- Excluding
>> reporter slf4j, not configured in reporter list (prom).
>>
>> Note that nowadays metrics.reporters is no longer required; the set of
>> reporters is automatically determined based on configured properties; the
>> only use-case is disabling a reporter without having to remove the entire
>> configuration.
>> I'd suggest to just remove the option, try again, and report back.
>>
>> On 06/07/2020 16:35, Chesnay Schepler wrote:
>>
>> Please enable debug logging and search for warnings from the metric
>> groups/registry/reporter.
>>
>> If you cannot find anything suspicious, you can also send the foll log to
>> me directly.
>>
>> On 06/07/2020 16:29, Manish G wrote:
>>
>> Job is an infinite streaming one, so it keeps going. Flink configuration
>> is as:
>>
>> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
>> metrics.reporter.slf4j.interval: 30 SECONDS
>>
>>
>>
>> On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler 
>> wrote:
>>
>>> How long did the job run for, and what is the configured interval?
>>>
>>>
>>> On 06/07/2020 15:51, Manish G wrote:
>>>
>>> Hi,
>>>
>>> Thanks for this.
>>>
>>> I did the configuration as mentioned at the link(changes in
>>> flink-conf.yml, copying the jar in lib directory), and registered the Meter
>>> with metrics group and invoked markEvent() method in the target code. But I
>>> don't see any related logs.
>>> I am doing this all on my local computer.
>>>
>>> Anything else I need to do?
>>>
>>> With regards
>>> Manish
>>>
>>> On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler 
>>> wrote:
>>>
 Have you looked at the SLF4J reporter?


 

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You've said elsewhere that you do see some metrics in prometheus, which 
are those?


Why are you configuring the host for the prometheus reporter? This 
option is only for the PrometheusPushGatewayReporter.


On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but prometheus 
dashboard logs doesn't show custom metrics.


With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler > wrote:


You have explicitly configured a reporter list, resulting in the
slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup - Excluding
reporter slf4j, not configured in reporter list (prom).

Note that nowadays metrics.reporters is no longer required; the
set of reporters is automatically determined based on configured
properties; the only use-case is disabling a reporter without
having to remove the entire configuration.
I'd suggest to just remove the option, try again, and report back.

On 06/07/2020 16:35, Chesnay Schepler wrote:

Please enable debug logging and search for warnings from the
metric groups/registry/reporter.

If you cannot find anything suspicious, you can also send the
foll log to me directly.

On 06/07/2020 16:29, Manish G wrote:

Job is an infinite streaming one, so it keeps going. Flink
configuration is as:

metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How long did the job run for, and what is the configured
interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and
registered the Meter with metrics group and invoked
markEvent() method in the target code. But I don't see any
related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application
logs apart from
> publishing it to Prometheus?
>
> With regards












Re: Logging Flink metrics

2020-07-06 Thread Manish G
Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but prometheus
dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler  wrote:

> You have explicitly configured a reporter list, resulting in the slf4j
> reporter being ignored:
>
> 2020-07-06 13:48:22,191 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporters, prom
> 2020-07-06 13:48:23,203 INFO
> org.apache.flink.runtime.metrics.ReporterSetup- Excluding
> reporter slf4j, not configured in reporter list (prom).
>
> Note that nowadays metrics.reporters is no longer required; the set of
> reporters is automatically determined based on configured properties; the
> only use-case is disabling a reporter without having to remove the entire
> configuration.
> I'd suggest to just remove the option, try again, and report back.
>
> On 06/07/2020 16:35, Chesnay Schepler wrote:
>
> Please enable debug logging and search for warnings from the metric
> groups/registry/reporter.
>
> If you cannot find anything suspicious, you can also send the foll log to
> me directly.
>
> On 06/07/2020 16:29, Manish G wrote:
>
> Job is an infinite streaming one, so it keeps going. Flink configuration
> is as:
>
> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
> metrics.reporter.slf4j.interval: 30 SECONDS
>
>
>
> On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler 
> wrote:
>
>> How long did the job run for, and what is the configured interval?
>>
>>
>> On 06/07/2020 15:51, Manish G wrote:
>>
>> Hi,
>>
>> Thanks for this.
>>
>> I did the configuration as mentioned at the link(changes in
>> flink-conf.yml, copying the jar in lib directory), and registered the Meter
>> with metrics group and invoked markEvent() method in the target code. But I
>> don't see any related logs.
>> I am doing this all on my local computer.
>>
>> Anything else I need to do?
>>
>> With regards
>> Manish
>>
>> On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler 
>> wrote:
>>
>>> Have you looked at the SLF4J reporter?
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>>
>>> On 06/07/2020 13:49, Manish G wrote:
>>> > Hi,
>>> >
>>> > Is it possible to log Flink metrics in application logs apart from
>>> > publishing it to Prometheus?
>>> >
>>> > With regards
>>>
>>>
>>>
>>
>
>


Re: Flink SQL复杂JSON解析

2020-07-06 Thread Leonard Xu
Hi,

Schema 里可以声明成array, 推荐有拆成多行数据的需求用UDTF处理,现在source里是没看到有拆分多行的实现。

Best,
Leonard Xu

> 在 2020年7月6日,21:28,王 outlook  写道:
> 
> TableFunction这个UDTF



Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You have explicitly configured a reporter list, resulting in the slf4j 
reporter being ignored:


2020-07-06 13:48:22,191 INFO 
org.apache.flink.configuration.GlobalConfiguration    - Loading 
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO 
org.apache.flink.runtime.metrics.ReporterSetup    - 
Excluding reporter slf4j, not configured in reporter list (prom).


Note that nowadays metrics.reporters is no longer required; the set of 
reporters is automatically determined based on configured properties; 
the only use-case is disabling a reporter without having to remove the 
entire configuration.

I'd suggest to just remove the option, try again, and report back.

On 06/07/2020 16:35, Chesnay Schepler wrote:
Please enable debug logging and search for warnings from the metric 
groups/registry/reporter.


If you cannot find anything suspicious, you can also send the foll log 
to me directly.


On 06/07/2020 16:29, Manish G wrote:
Job is an infinite streaming one, so it keeps going. Flink 
configuration is as:


metrics.reporter.slf4j.class: 
org.apache.flink.metrics.slf4j.Slf4jReporter

metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler > wrote:


How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and
registered the Meter with metrics group and invoked markEvent()
method in the target code. But I don't see any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs
apart from
> publishing it to Prometheus?
>
> With regards










Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-06 Thread Benchao Li
感觉不太应该有这种情况,你用的是blink planner么?

x <35907...@qq.com> 于2020年7月6日周一 下午1:24写道:

> sorry,我说错了,确实没有,都是group agg.
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年7月6日(星期一) 中午12:52
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
> 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> x <35907...@qq.com 于2020年7月6日周一 上午11:15写道:
>
>  版本是1.10.1,最后sink的时候确实是一个window里面做count
>  distinct操作。请问是只要计算过程中含有一个window里面做count
> 
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupnbsp;DATE_FORMAT(rowtm,
>  '-MM-dd') 这个sql对应的状态很大。代码如下:
>  val rt_totaluv_view : Table = tabEnv.sqlQuery(
>  """
>  SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd
> HH:mm:00'))
>  time_str,COUNT(DISTINCT userkey) uv
>  FROM source
>  GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
>  """)
>  tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
> 
>  val totaluvTmp =
> tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
>  .filter( line =gt; line._1 == true ).map( line
> =gt; line._2 )
> 
>  val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
> 
>  tabEnv.sqlUpdate(
>  s"""
>  INSERT INTO mysql_totaluv
>  SELECT _1,MAX(_2)
>  FROM $totaluvTabTmp
>  GROUP BY _1
>  """)
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年7月3日(星期五) 晚上9:47
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
>  这个已经在1.11中修复了。
> 
>  [1] https://issues.apache.org/jira/browse/FLINK-17942
> 
>  x <35907...@qq.comgt; 于2020年7月3日周五 下午4:34写道:
> 
>  gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
>  gt;
>  gt;
> 
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:amp;nbsp;"Jark Wu"  gt; 发送时间:amp;nbsp;2020年6月18日(星期四) 中午12:16
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt;
>  gt;
>  gt;
>  gt; 是的,我觉得这样子是能绕过的。
>  gt;
>  gt; On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comamp;gt;
> wrote:
>  gt;
>  gt; amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
>  gt; amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
>  gt; amp;gt;amp;nbsp;amp;nbsp; """
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT
>  MAX(DATE_FORMAT(ts, '-MM-dd
>  gt; HH:mm:00'))
>  gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM
>  user_behavioramp;nbsp;amp;nbsp;amp;nbsp; GROUP BY
>  gt; DATE_FORMAT(ts,
> '-MM-dd')amp;nbsp;amp;nbsp;amp;nbsp; """)
>  gt; amp;gt;
>  gt; amp;gt; val
>  resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>  gt; amp;gt;amp;nbsp;amp;nbsp;
>  gt;
> .filter(line=amp;amp;gt;line._1==true).map(line=amp;amp;gt;line._2)
>  gt; amp;gt;
>  gt; amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
>  gt; amp;gt; tabEnv.sqlUpdate(
>  gt; amp;gt;amp;nbsp;amp;nbsp; s"""
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT
> INTO
>  rt_totaluv
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT
> _1,MAX(_2)
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM
> $res
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP
> BY _1
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """)
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:amp;amp;nbsp;"Jark Wu"<
> imj...@gmail.comamp;amp;gt;;
>  gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
>  gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.org
>  amp;amp;gt;;
>  gt; amp;gt;
>  gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; 在 Flink 1.11 中,你可以尝试这样:
>  gt; amp;gt;
>  gt; amp;gt; CREATE TABLE mysql (
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; time_str
> STRING,
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; uv BIGINT,
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; PRIMARY
> KEY (ts) NOT ENFORCED
>  gt; amp;gt; ) WITH (
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp;
> 'connector' = 'jdbc',
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; 'url' =
>  'jdbc:mysql://localhost:3306/mydatabase',
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp;
> 'table-name' = 'myuv'
>  gt; amp;gt; );
>  gt; amp;gt;
>  gt; amp;gt; INSERT INTO mysql
>  gt; amp;gt; SELECT MAX(DATE_FORMAT(ts, '-MM-dd
> HH:mm:00')),
>  gt; COUNT(DISTINCTamp;amp;nbsp;
>  gt; amp;gt; user_id)
>  gt; amp;gt; FROM user_behavior;
>  gt; amp;gt;
>  gt; amp;gt; On Wed, 17 Jun 2020 at 13:49, x <
> 35907...@qq.comamp;amp;gt;
>  wrote:
>  gt; amp;gt;
>  gt; amp;gt; amp;amp;gt;
> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  gt; amp;gt; 

Re: flink sql子查询状态清理不掉

2020-07-06 Thread Benchao Li
感觉不太应该。你用的是哪个Flink版本,以及哪个planner呢?

op <520075...@qq.com> 于2020年7月6日周一 上午11:31写道:

> 大家好,我现在程序里面有像这样一段sql: select day,
>  count(id),
>  sum(v1) from
> (
> select
>  day ,
>  id ,
>  sum(v1) v1 from source
>group by day,
>  id
> )t
>
>
> group by day
>
>
> 我设置了
> tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))
>
>
>
> 子查询里面的聚合是按照天和id聚合的,按道理1天之后就会自动清理,但是运行4天过程中,我在checkpoint的页面里面看到这个子查询的状态一直在增大,这是什么原因呢
> 我的版本是1.10.0



-- 

Best,
Benchao Li


Re: flink interval join后按窗口聚组问题

2020-07-06 Thread Benchao Li
我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。

因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。

我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
的时间最早的那个。

元始(Bob Hu) <657390...@qq.com> 于2020年7月5日周日 下午8:48写道:

> 谢谢您的解答。感觉flink这个机制有点奇怪呢
>
>
> -- 原始邮件 --
> *发件人:* "Benchao Li";
> *发送时间:* 2020年7月5日(星期天) 中午11:58
> *收件人:* "元始(Bob Hu)"<657390...@qq.com>;
> *抄送:* "user-zh";
> *主题:* Re: flink interval join后按窗口聚组问题
>
> 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> 就会有些问题,很多数据被作为late数据直接丢掉了。
>
> 元始(Bob Hu) <657390...@qq.com> 于2020年7月3日周五 下午3:29写道:
>
>> 您好,我想请教一个问题:
>> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
>> and a.rowtime + INTERVAL '1' HOUR
>> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> allowedLateness +
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> rightRelativeSize) +
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> by的时候这种右表数据为空的数据就丢掉了啊。
>> flink版本 1.10.0。
>>
>> 下面是我的一段测试代码:
>>
>> import org.apache.commons.net.ntp.TimeStamp;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.ScalarFunction;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.IOUtils;
>>
>> import java.io.BufferedReader;
>> import java.io.InputStreamReader;
>> import java.io.Serializable;
>> import java.net.InetSocketAddress;
>> import java.net.Socket;
>> import java.sql.Timestamp;
>> import java.text.SimpleDateFormat;
>> import java.util.ArrayList;
>> import java.util.Date;
>> import java.util.List;
>>
>> public class TimeBoundedJoin {
>>
>> public static AssignerWithPeriodicWatermarks getWatermark(Integer 
>> maxIdleTime, long finalMaxOutOfOrderness) {
>> AssignerWithPeriodicWatermarks timestampExtractor = new 
>> AssignerWithPeriodicWatermarks() {
>> private long currentMaxTimestamp = 0;
>> private long lastMaxTimestamp = 0;
>> private long lastUpdateTime = 0;
>> boolean firstWatermark = true;
>> //Integer maxIdleTime = 30;
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> if(firstWatermark) {
>> lastUpdateTime = System.currentTimeMillis();
>> firstWatermark = false;
>> }
>> if(currentMaxTimestamp != lastMaxTimestamp) {
>> lastMaxTimestamp = currentMaxTimestamp;
>> lastUpdateTime = System.currentTimeMillis();
>> }
>> if(maxIdleTime != null && System.currentTimeMillis() - 
>> lastUpdateTime > maxIdleTime * 1000) {
>> return new Watermark(new Date().getTime() - 
>> finalMaxOutOfOrderness * 1000);
>> }
>> return new Watermark(currentMaxTimestamp - 
>> finalMaxOutOfOrderness * 1000);
>>
>> }
>>
>> @Override
>> public long extractTimestamp(Row row, long 
>> previousElementTimestamp) {
>> Object value = row.getField(1);
>> long timestamp;
>> try {
>> timestamp = (long)value;
>> } catch (Exception e) {
>> timestamp = ((Timestamp)value).getTime();
>> }
>> if(timestamp > currentMaxTimestamp) {
>> currentMaxTimestamp = timestamp;
>> }
>> return timestamp;
>> }
>> };
>> return timestampExtractor;
>> }
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment bsEnv = 
>> 

Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
I've noticed that this FLINK-11501 was implemented in
flink-connector-kafka-0.10 [1], but it wasn't in the current version of the
flink-connector-kafka. There is any reason for this, and why should be the
best solution to implement a rate limit functionality in the current Kafka
consumer?

Thanks,
David

[1]
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java

[2]
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java


Re: Flink SQL复杂JSON解析

2020-07-06 Thread Benchao Li
我理解最佳实践是第一种,先读出来array,再用table function展开成多行。
实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

王 outlook  于2020年7月6日周一 下午9:29写道:

> 像如下这种JSON输入,
>
> {
>   "id": 1,
>   "many_names": [
> {"name": "foo"},
> {"name": "bar"}
>   ]
> }
>
> 输出表两行  id 1, name foo  |  id 1, name bar
>
> 最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
> 还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
>
>
> 来自 Outlook
>


-- 

Best,
Benchao Li


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
Please enable debug logging and search for warnings from the metric 
groups/registry/reporter.


If you cannot find anything suspicious, you can also send the foll log 
to me directly.


On 06/07/2020 16:29, Manish G wrote:
Job is an infinite streaming one, so it keeps going. Flink 
configuration is as:


metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler > wrote:


How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and registered
the Meter with metrics group and invoked markEvent() method in
the target code. But I don't see any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs
apart from
> publishing it to Prometheus?
>
> With regards








Re: Logging Flink metrics

2020-07-06 Thread Manish G
Job is an infinite streaming one, so it keeps going. Flink configuration is
as:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler  wrote:

> How long did the job run for, and what is the configured interval?
>
>
> On 06/07/2020 15:51, Manish G wrote:
>
> Hi,
>
> Thanks for this.
>
> I did the configuration as mentioned at the link(changes in
> flink-conf.yml, copying the jar in lib directory), and registered the Meter
> with metrics group and invoked markEvent() method in the target code. But I
> don't see any related logs.
> I am doing this all on my local computer.
>
> Anything else I need to do?
>
> With regards
> Manish
>
> On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler 
> wrote:
>
>> Have you looked at the SLF4J reporter?
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>
>> On 06/07/2020 13:49, Manish G wrote:
>> > Hi,
>> >
>> > Is it possible to log Flink metrics in application logs apart from
>> > publishing it to Prometheus?
>> >
>> > With regards
>>
>>
>>
>


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in 
flink-conf.yml, copying the jar in lib directory), and registered the 
Meter with metrics group and invoked markEvent() method in the target 
code. But I don't see any related logs.

I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler > wrote:


Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs apart from
> publishing it to Prometheus?
>
> With regards






Re: Logging Flink metrics

2020-07-06 Thread Manish G
Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in flink-conf.yml,
copying the jar in lib directory), and registered the Meter with metrics
group and invoked markEvent() method in the target code. But I don't see
any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler  wrote:

> Have you looked at the SLF4J reporter?
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>
> On 06/07/2020 13:49, Manish G wrote:
> > Hi,
> >
> > Is it possible to log Flink metrics in application logs apart from
> > publishing it to Prometheus?
> >
> > With regards
>
>
>


Flink SQL复杂JSON解析

2020-07-06 Thread 王 outlook
像如下这种JSON输入,

{
  "id": 1,
  "many_names": [
{"name": "foo"},
{"name": "bar"}
  ]
}

输出表两行  id 1, name foo  |  id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。


来自 Outlook


Re: Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Jan Brusch

Hi Igal,

thanks for the quick reply. That does make sense and I will give it a try.

It might probably make sense to add that to the Documentation.


Best regards and thanks!

Jan

On 06.07.20 14:02, Igal Shilman wrote:

Hi Jan,

Stateful functions would look at the java class path for the module.yaml,
So one way would be including the module.yaml in your 
src/main/resources/ directory.


Good luck,
Igal.


On Mon, Jul 6, 2020 at 12:39 PM Jan Brusch > wrote:


Hi,

quick question about Deploying a Flink Stateful Functions
Application to
an existing cluster: The Documentation says to integrate
"statefun-flink-distribution" as additional maven Dependency in
the fat
jar.

(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar)

But how and where do I upload my module.yml for external function
definitions in that scenario...?


Best regards

Jan


--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501



Re: Stateful Functions: Routing to remote functions

2020-07-06 Thread Jan Brusch

Hi igal,

thanks for your comprehensive reply!

As for 1. I will try and create a minimal reproduction of the case and 
share the code with you. It might be a few days until I get around to do it.


As for 2. I will definitely give this a try. From the looks of it this 
seems to be the solution and this was the error in my thinking: Sending 
unwrapped messages to external functions...



Best regards and many thanks!

Jan

On 06.07.20 14:11, Igal Shilman wrote:

Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem 
like the function type is unavailable, and I'd like to follow up on 
that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, 
then you can please try to execute into the container 
and manually validate that the module.yaml is present
both on the "worker" image and the "master" image, and it defines the 
remote function name correctly?


2. In your original email, the provided router does not route messages 
of type Any, but it actually
forwards them as-in, the remote functions API requires that the 
message being sent to the remote function

is of type Any.  Can you try something like this:

final class EventRouter implements Router {


 static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
 static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
 @Override
 public void route(com.google.protobuf.Message event,
Downstream downstream) {

downstream.forward(
 JAVA_EVENT_COUNTER_TYPE,
 "count",
 event)
 ;
 downstream.forward(
 new Address(
 PYTHON_EVENT_COUNTER_TYPE,
 "count"
 ),
Any.pack(event)
 );
 }
}



In addition you would have to change the definition of your ingress 
identifier to have a produced type of com.google.protobuf.Message

instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch > wrote:


Hi Igal,

thanks for your reply. Initially I thought the same thing, but it
turns out I am able to call the remote function from an embedded
"wrapper" function using the exact same setup (Relevant Code
below). So that's one kind of solution to that Problem. But to me
it seems like it's a bit of a hack and not the idiomatic way to
solve this...

From my understanding of the address based communication within
Flink Stateful Functions, I feel like it should be possible to
call that function from the router directly. But I am probably
either using the Router wrong or misunderstand some of the ideas
behind address based communication...


EventRouter.java




final class EventRouter implements Router {

  @Override
  public void route(Event event, Downstream downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}


--


EventCounterWrapper.java


---

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo",
"eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new
FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
    if (input instanceof Event) {
    Event event = (Event) input;
    Any message = Any.pack(event);
    context.send(REMOTE_FUNCTION_TYPE, "_", message);
    }

    if (input instanceof Any) {
    final EventCount eventCount;
    try {
    eventCount = ((Any) input).unpack(EventCount.class);
    } catch (InvalidProtocolBufferException e) {
    throw new RuntimeException("Unexpected type", e);
    }
context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
    }
    }
}


---


worker.py

@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
 state = context.state('count').unpack(EventCount)
 if not state:
 state = EventCount()
 state.count = 1
 else:
 state.count += 1
 context.state('count').pack(state)

    envelope 

Re: 滑动窗口数据存储多份问题

2020-07-06 Thread Congxian Qiu
Hi

我理解,如果只存取一份的话,state 的管理会变得麻烦一些(所有需要这份数据的窗口都需要去某个地方取, state 什么时候清理逻辑也会变得麻烦一些)

Best,
Congxian


张浩  于2020年7月6日周一 下午1:57写道:

> 你好,我的思考是便于在状态信息中清除或者提取每一个窗口的数据信息。
> 不知道,我这样理解的对吗?
> 另外,为什么我们不能只存储一份数据呢?
> 非常感谢与您交流!
>
>
>
> 张浩
> 邮箱:zhanghao_w...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> 在2020年07月06日 13:46,Congxian Qiu  写道:
> Hi
> 现在的实现是这样的,每条数据会在每个窗口中存一份
>
> Best,
> Congxian
>
>
> 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
>
> > Hi,all!
> > 由于第一次咨询,我不确定上一份邮件大家是否收到。
> > 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> > 份?
> >
> >
> > | |
> > 张浩
> > |
> > |
> > 13669299...@163.com
> > |
> > 签名由网易邮箱大师定制
>
>


??????Flink??SavePoint??????????????????????????

2020-07-06 Thread milan183sansiro
??
1.??id
2.savepoint??


??2020??7??6?? 20:32??wujunxi<462329...@qq.com> ??

1.id
2.savepoint??



----
??:"milan183sansiro"

??????Flink??SavePoint??????????????????????????

2020-07-06 Thread wujunxi

1.id
2.savepoint??



----
??:"milan183sansiro"

Re: can't exectue query when table type is datagen

2020-07-06 Thread Danny Chan
Dear xin Destiny ~

It seems that you use the legacy planner so the exception throws [1] ~

I agree that there needs a prompt here to indicate that it is a legacy planner, 
have fired an issue [2],
Actually for legacy, it is a regression because before the change, the computed 
column is supported well.

[1] 
https://github.com/apache/flink/blob/1b1c343e8964c6400c7c1de3c70212522ba59a64/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java#L86
[2] https://issues.apache.org/jira/browse/FLINK-18500

Best,
Danny Chan
在 2020年7月5日 +0800 AM10:52,xin Destiny ,写道:
> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from 
> branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the 
> exception message is null ;
>
> My DDL is :
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
>
> My query sql is :
> select * from datagen_dijie2;
> the exception is :
> Fail to run sql command: select * from datagen_dijie2 
> org.apache.flink.table.api.ValidationException: SQL validation failed. null 
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>  at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
>  at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
>  at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
>  at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
>  at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.UnsupportedOperationException at 
> org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>  at 
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>  at 
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>  at 
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>  at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) at 
> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) at 
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) 
> at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>  at 
> 

Re: Stateful Functions: Routing to remote functions

2020-07-06 Thread Igal Shilman
Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the
function type is unavailable, and I'd like to follow up on that: can you
please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, then
you can please try to execute into the container and manually validate that
the module.yaml is present
both on the "worker" image and the "master" image, and it defines the
remote function name correctly?

2. In your original email, the provided router does not route messages of
type Any, but it actually
forwards them as-in, the remote functions API requires that the message
being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router {

>
>  static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterPython");
>  static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterJava");
>  @Override
>  public void route(com.google.protobuf.Message event, Downstream<
> com.google.protobuf.Message> downstream) {
>
 downstream.forward(
>  JAVA_EVENT_COUNTER_TYPE,
>  "count",
>  event)
>  ;
>  downstream.forward(
>  new Address(
>  PYTHON_EVENT_COUNTER_TYPE,
>  "count"
>  ),
>  Any.pack(event)
>  );
>  }
> }



In addition you would have to change the definition of your ingress
identifier to have a produced type of com.google.protobuf.Message
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch 
wrote:

> Hi Igal,
>
> thanks for your reply. Initially I thought the same thing, but it turns
> out I am able to call the remote function from an embedded "wrapper"
> function using the exact same setup (Relevant Code below). So that's one
> kind of solution to that Problem. But to me it seems like it's a bit of a
> hack and not the idiomatic way to solve this...
>
> From my understanding of the address based communication within Flink
> Stateful Functions, I feel like it should be possible to call that function
> from the router directly. But I am probably either using the Router wrong
> or misunderstand some of the ideas behind address based communication...
>
>
> EventRouter.java
>
>
> 
>
> final class EventRouter implements Router {
>
>   @Override
>   public void route(Event event, Downstream downstream) {
> downstream.forward(EventCounterWrapper.TYPE, "_", event);
>   }
> }
>
>
> --
>
>
> EventCounterWrapper.java
>
>
> ---
>
> public class EventCounterWrapper implements StatefulFunction {
>
> static final FunctionType TYPE = new FunctionType("demo",
> "eventCounterWrapper");
> public static final FunctionType REMOTE_FUNCTION_TYPE = new
> FunctionType("demo/external", "eventCounterPython");
>
> @Override
> public void invoke(Context context, Object input) {
> if (input instanceof Event) {
> Event event = (Event) input;
> Any message = Any.pack(event);
> context.send(REMOTE_FUNCTION_TYPE, "_", message);
> }
>
> if (input instanceof Any) {
> final EventCount eventCount;
> try {
> eventCount = ((Any) input).unpack(EventCount.class);
> } catch (InvalidProtocolBufferException e) {
> throw new RuntimeException("Unexpected type", e);
> }
> context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
> }
> }
> }
>
>
> ---
>
>
> worker.py
> 
> @functions.bind("demo/external/eventCounterPython")
> def handle_event(context, _):
>  state = context.state('count').unpack(EventCount)
>  if not state:
>  state = EventCount()
>  state.count = 1
>  else:
>  state.count += 1
>  context.state('count').pack(state)
>
>
> envelope = Any()
> envelope.Pack(state)
> context.reply(envelope)
> 
>
>
> module.yaml
>
> -
>
> spec:
> functions:
>   - function:
>   meta:
> kind: http
> type: demo/external/eventCounterPython
>   spec:
> endpoint: http://python-worker:8000/statefun
> states:
>   - count
>
> -
>
>
> Best Regards
>
> Jan
>
>
> On 03.07.20 17:33, Igal 

????jdbc connector????????

2020-07-06 Thread claylin
hi 
allsqlclickhouse??https://github.com/apache/flink/blob/d04872d2c6b7570ea3ba02f8fc4fca02daa96118/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java#L30,
 
??DerbyDialect??MySQLDialect??PostgresDialect??jdbcSQL??

Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
I also tried doing this by using a User Defined Function.

class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field
'feature1', data['0002'] in field 'feature2' etc.

t_env.register_function("data_converter", udf(DataConverter(),
input_types = [DataTypes.STRING()],
  result_type =
  DataTypes.ROW([

DataTypes.FIELD("feature1", DataTypes.STRING())
  ])))


t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field
"data" from the previous mail
.insert_into(OUTPUT_TABLE)


I used a ROW to hold multiple values but I can't figure out how I can
return a populated ROW object from the eval() method. Where is the method
to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale  wrote:

> Hi Xingbo,
> Thanks for the reply, I didn't know that a table schema also needs to be
> declared after the connect or but I understand now.
> I have another question: how do I write the parsing schemas for a field
> that itself is a valid JSON string? For example:
> {
> "monitorId": 865,
> "deviceId": "94:54:93:49:96:13",
> "data":
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
> "state": 2,
> "time": 1593687809180
> }
> The field "data" is a string of valid JSON with string:number objects. I'm
> currently trying using JSON schema object and DataTypes.ROW, but am getting
> deserialization errors.
>
> .with_format(
> Json()
> .json_schema(
> """
> {
> "type": "object",
> "properties": {
> "monitorId": {
> "type": "string"
> },
> "deviceId": {
> "type": "string"
> },
> "data": {
> "type": "object"
> },
> "state": {
> "type": "integer"
> },
> "time": {
> "type": "string"
> }
> }
> }
> """
> )
> ) \
> .with_schema(
> Schema()
> .field("monitorId", DataTypes.STRING())
> .field("deviceId", DataTypes.STRING())
> .field("data", DataTypes.ROW())
> )
>
> Regards,
>
> Manas
>
>
> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:
>
>> Hi, Manas
>> You need to define the schema. You can refer to the following example:
>>  t_env.connect(
>> Kafka()
>> .version('0.11')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>> .property("zookeeper.connect", "localhost:2181")
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )
>> ) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("lon", DataTypes.DECIMAL(20, 10))
>> .field("rideTime", DataTypes.TIMESTAMP(6))
>> ).register_table_source(INPUT_TABLE)
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年7月2日周四 下午7:59写道:
>>
>>> Hi,
>>> I'm trying to get a simple consumer/producer running using the following
>>> code referred from the provided links :
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>>> StreamTableEnvironment
>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>
>>> t_config = TableConfig()
>>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>
>>> INPUT_TOPIC = 'xyz'
>>> INPUT_TABLE = 'raw_message'
>>> PROD_ZOOKEEPER = '...'
>>> PROD_KAFKA = '...'
>>>
>>> OUTPUT_TOPIC = 'summary_output'
>>> OUTPUT_TABLE = 'feature_summary'
>>> LOCAL_ZOOKEEPER = 'localhost:2181'
>>> LOCAL_KAFKA = 'localhost:9092'
>>>
>>>
>>> t_env.connect(
>>> Kafka()
>>> .version('universal')
>>> .topic(INPUT_TOPIC)
>>> .property("bootstrap.servers", PROD_KAFKA)
>>>
>>> .start_from_latest()
>>> ) \
>>> .with_format(
>>> Json()
>>> .json_schema(
>>> "{"
>>> "  type: 'object',"
>>> "  properties: {"
>>> "lon: {"
>>> "  type: 'number'"
>>> "},"
>>> "rideTime: {"
>>> "  type: 'string',"
>>> "  format: 'date-time'"
>>> "}"
>>> "  }"
>>> "}"
>>> )
>>> ).register_table_source(INPUT_TABLE)
>>>
>>> t_env.connect(Kafka()
>>> .version('universal')
>>> 

Re: Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Igal Shilman
Hi Jan,

Stateful functions would look at the java class path for the module.yaml,
So one way would be including the module.yaml in your src/main/resources/
directory.

Good luck,
Igal.


On Mon, Jul 6, 2020 at 12:39 PM Jan Brusch 
wrote:

> Hi,
>
> quick question about Deploying a Flink Stateful Functions Application to
> an existing cluster: The Documentation says to integrate
> "statefun-flink-distribution" as additional maven Dependency in the fat
> jar.
> (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar
> )
>
> But how and where do I upload my module.yml for external function
> definitions in that scenario...?
>
>
> Best regards
>
> Jan
>
>


Flink从SavePoint启动任务,修改的代码不生效

2020-07-06 Thread milan183sansiro
各位好:
背景:Flink版本1.6.4,数据源为Kafka的主题A,B,消费者组相同
操作步骤:1.使用SavePoint取消任务。
2.修改代码将B去掉,只消费A主题。
3.从SavePoint启动任务,发现消费者组在B主题下的偏移量也回到了任务停止时的偏移量,之后偏移量马上变成了最新并继续消费。
想知道为什么修改代码不生效。



Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

Have you looked at the SLF4J reporter?

https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:

Hi,

Is it possible to log Flink metrics in application logs apart from 
publishing it to Prometheus?


With regards





Logging Flink metrics

2020-07-06 Thread Manish G
Hi,

Is it possible to log Flink metrics in application logs apart from
publishing it to Prometheus?

With regards


Re: Asynchronous I/O poor performance

2020-07-06 Thread Benchao Li
Hi Mark,

According to your data, I think the config of AsyncOperator is OK.
There is one more config that might affect the throughput of AsyncOperator,
it's watermark.
Because unordered async operator still keeps the order between watermarks,
did you use
event time in your job, and if yes, what's the watermark interval in your
job?

Mark Zitnik  于2020年7月5日周日 下午7:44写道:

> Hi Benchao
>
> The capacity is 100
> Parallelism is 8
> Rpc req is 20ms
>
> Thanks
>
>
> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>
>> Hi Mark,
>>
>> Could you give more details about your Flink job?
>> - the capacity of AsyncDataStream
>> - the parallelism of AsyncDataStream operator
>> - the time of per blocked rpc request
>>
>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>
>>> Hi
>>>
>>> In my flink application I need to enrich data using 
>>> AsyncDataStream.unorderedWait
>>> but I am getting poor perforce at the beginning I was just working with
>>> http call, but I have switched to grpc, I running on 8 core node and
>>> getting total of 3200 events per second my service that I am using is not
>>> fully utilized and can produce up to 1 req/seq
>>>
>>> Flink job flow
>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>> to Kafka
>>>
>>> Using Akkad grpc code written in scala
>>>
>>> Thanks
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li


Re: Flink状态调试

2020-07-06 Thread shizk233
抱歉,我似乎回错邮件了,应该使用回复全部?

Congxian Qiu  于2020年7月6日周一 下午7:22写道:

> Hi
> 想 debug checkpoint 文件的话,可以参考下这个 UT[1]
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
> Best,
> Congxian
>
>
> Z-Z  于2020年7月6日周一 下午4:59写道:
>
> > Hi, 各位大佬们,请教一下:
> > Flink的checkpoint怎么调试啊,我想看程序目前的状态,拿到了checkpoint的文件,打开后有一些东西是乱码,没有结构性,有方法吗?
>


Re: Flink状态调试

2020-07-06 Thread shizk233
Hi Z-Z,

如果你想查看的是程序中的state内容,建议触发一次savepoint并搭配state processor api来查询。

参考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html

Best,
shizk233

Congxian Qiu  于2020年7月6日周一 下午7:22写道:

> Hi
> 想 debug checkpoint 文件的话,可以参考下这个 UT[1]
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
> Best,
> Congxian
>
>
> Z-Z  于2020年7月6日周一 下午4:59写道:
>
> > Hi, 各位大佬们,请教一下:
> > Flink的checkpoint怎么调试啊,我想看程序目前的状态,拿到了checkpoint的文件,打开后有一些东西是乱码,没有结构性,有方法吗?
>


Re: Flink状态调试

2020-07-06 Thread Congxian Qiu
Hi
想 debug checkpoint 文件的话,可以参考下这个 UT[1]

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,
Congxian


Z-Z  于2020年7月6日周一 下午4:59写道:

> Hi, 各位大佬们,请教一下:
> Flink的checkpoint怎么调试啊,我想看程序目前的状态,拿到了checkpoint的文件,打开后有一些东西是乱码,没有结构性,有方法吗?


Re: Timeout when using RockDB to handle large state in a stream app

2020-07-06 Thread Felipe Gutierrez
Hi all,

I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
the cluster with RocksDB state backend. One thing that I did that
improved a lot was to replace the List POJO to a
List>. Then I could load a table of 200MB in memory as my
state. However, the original table is 725MB, and turned out that I
need another configuration. I am not sure what I can do more to reduce
the size of my state. If one of you have an idea I am thankful to
hear.

Now, speaking about the flink-conf.yaml file and the RocksDB
configuration. When I use these configurations on the flink-conf.yaml
the stream job still runs out of memory.
jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 10
taskmanager.memory.process.size: 2g # default: 1728m

Then I changed for this configuration which I can set
programmatically. The stream job seems to behave better. It starts to
process something, then the metrics disappear for some time and appear
again. The available and used memory on the TM
(flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
the available and used memory on the JM
(flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
overwriting the configuration on the flink-conf.yaml file.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
env.setStateBackend(stateBackend);

How can I increase the memory of the JM and TM when I am still using
the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?

[1] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
[2] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
 wrote:
>
> yes. I agree. because RocsDB will spill data to disk if there is not
> enough space in memory.
> Thanks
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 8:27 AM Yun Tang  wrote:
> >
> > Hi Felipe,
> >
> > I noticed my previous mail has a typo: RocksDB is executed in task main 
> > thread which does not take the role to respond to heart beat. Sorry for 
> > previous typo, and the key point I want to clarify is that RocksDB should 
> > not have business for heartbeat problem.
> >
> > Best
> > Yun Tang
> > 
> > From: Felipe Gutierrez 
> > Sent: Tuesday, June 30, 2020 17:46
> > To: Yun Tang 
> > Cc: Ori Popowski ; user 
> > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> >
> > Hi,
> >
> > I reduced the size of the tables that I am loading on a ListState and
> > the query worked. One of them was about 700MB [1] [2].
> >
> > Now I am gonna deploy it on the cluster and check if it works. I will
> > probably need to increase the heartbeat timeout.
> >
> > Thanks,
> > Felipe
> > [1] 
> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > [2] 
> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang  wrote:
> > >
> > > Hi Felipe
> > >
> > > RocksDB is executed in task main thread which does take the role to 
> > > respond to heart beat and RocksDB mainly use native memory which is 
> > > decoupled from JVM heap to not bring any GC pressure. Thus, timeout 
> > > should have no relationship with RocksDB in general if your task manager 
> > > is really heartbeat timeout instead of crash to exit.
> > >
> > > Try to increase the heartbeat timeout [1] and watch the GC detail logs to 
> > > see anything weird.
> > >
> > > [1] 
> > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> > >
> > > Best
> > > Yun Tang
> > >
> > > 
> > > From: Ori Popowski 
> > > Sent: Monday, June 29, 2020 17:44
> > > Cc: user 
> > > Subject: Re: Timeout when using RockDB to handle large state in a stream 
> > > app
> > >
> > > Hi there,
> > >
> > > I'm currently experiencing the exact same issue.
> > >
> > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> > >
> > > I've found out that GC is causing the problem, but I still haven't 
> > > managed to solve this.
> > >
> > >
> > >
> > > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez 
> > >  wrote:
> > >
> > > Hi community,
> > >
> > > I am trying to 

[ANNOUNCE] Weekly Community Update 2020/26-27

2020-07-06 Thread Konstantin Knauf
Dear community,

happy to share this (and last) week's community update. Flink 1.11 is
finally ready to be released. Besides that a few design discussions in
different areas of Apache Flink, like enhanced fan out for Flink's Kinesis
Source or Temporal Table support in pure Flink SQL, and of course a bit
more.

Flink Development
==

* [releases] Zhinjiang has published release candidate #4 for Flink 1.11.0
last Tuesday. The vote [1] passed this morning, so we will see the release
of Flink 1.11 very soon.

* [sql] A while ago I started a discussion on supporting Temporal Table
Joins via pure Flink SQL. As of now, a user either needs to register a
Temporal Table Function in the Table API or the environments configuration
of the SQL CLI. This became a more involved discussion than anticipated
that Leonard Xu is doing a great job in moving forward. It seems that we
are close to a FLIP document now. [2]

* [connectors] Danny Cranmer has started the discussion [3] and -
subsequently - the vote [4] on FLIP-128, which adds support for enhanced
fan out for Flink's Kinesis source. With enhanced fan out each consumer
receives dedicated data output per shard, as opposed to competing for the
per-shared data output with other consumers.

* [apis] Aljoscha has started a discussion about what kind of compatibility
guarantees the community would like to give for the APIs that are commonly
used by packaged, third-party or custom connectors. Not too much feedback
so far, but right now it seems that we would like it to be safe to use
connectors across patch releases (1.x.y -> 1.x.z), but not across minor
releases (1.u -> 1.v). Based on the recent discussions [5] on the
guarantees for @PublicEvolving this means that connectors could only use
APIs that are annotated @Public or @PublicEvolving. [6]

* [state] Etienne Chauchot has published a design document for FLINK-17073,
which introduces a backpressure mechanism for checkpoints: when checkpoints
can not be cleaned up as quickly as they are created triggering new
checkpoints will be delayed. This change was motivated by an OOME on the
Jobmanger resulting from too many queued checkpoint clean up tasks. [7]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-11-0-release-candidate-4-tp42829.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLINK-16824-Creating-Temporal-Table-Function-via-DDL-tp40333.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-128-Enhanced-Fan-Out-for-AWS-Kinesis-Consumers-tp42728.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-128-Enhanced-Fan-Out-for-AWS-Kinesis-Consumers-tp42846.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Cross-version-compatibility-guarantees-of-Flink-Modules-Jars-tp42746.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Stability-guarantees-for-PublicEvolving-classes-tp41459.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-17073-checkpoint-backpressure-design-doc-tp42788.html

Notable Bugs
==

* [FLINK-18452] [1.11] [1.10.1] SQL queries that use "Top-N" can not be
restored from a savepoint due to a incorrectly implemented Object#equals in
one of the state objects. [8]

[8] https://issues.apache.org/jira/browse/FLINK-18452

Events, Blog Posts, Misc
===

* On the Ververica blog Jaehyeuk Oh & Gihoon Yeom explain how HyperConnect
is using Apache Flink for match making in their real-time communication app
Azar. [9]

* On the Flink blog, Jeff Zhang has published the second part of his blog
post series on Flink on Zeppelin. [10]

[9]
https://www.ververica.com/blog/data-driven-matchmaking-at-azar-with-apache-flink
[10]
https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-06 Thread shizk233
Hi Sun ZHu,

关于方法4,我记得kafka有时间轮功能,可以做到延迟消息的,可以了解一下。

Best,
shizk233

Sun.Zhu <17626017...@163.com> 于2020年7月4日周六 上午12:23写道:

> 感谢benchao和forideal的方案,
> 方法1.使用udf,查不到 sleep 等一下在查
> --这个可以尝试
> 方法2.在 join operator处数据等一会再去查
> —我们使用的是flink sql,不是streaming,所以该方案可能行不通
> 方法3.如果没有 join 上,就把数据发到source,循环join。
> --我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率
> 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了
> —我们的source是kafka,好像不支持kafka的功能
> 方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait
> 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。
> --这个方案需要修改源码,也可以试一下
>
>
> Best
> Sun.Zhu
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年07月3日 23:26,forideal 写道:
> Hi
>
>
>
>
> 刚刚本超说了四种方法,
>
> 方法1.使用udf,查不到 sleep 等一下在查
>
> 方法2.在 join operator处数据等一会再去查
>
> 方法3.如果没有 join 上,就把数据发到source,循环join。
>
> 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了
>
>
>
>
> 上述方法应该都能实现相同的效果。
>
>
>
>
> 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait
> 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。
>
>
>
>
> Best forideal
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-03 23:05:06,"Benchao Li"  写道:
> 奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。
>
> admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:
>
> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
>
> 感谢您的回复
>
>
>
> --
>
> Best,
> Benchao Li
>


Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Jan Brusch

Hi,

quick question about Deploying a Flink Stateful Functions Application to 
an existing cluster: The Documentation says to integrate 
"statefun-flink-distribution" as additional maven Dependency in the fat 
jar. 
(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar)


But how and where do I upload my module.yml for external function 
definitions in that scenario...?



Best regards

Jan



jemalloc dump 内存

2020-07-06 Thread SmileSmile
hi,社区的各位,是否有配置过jemalloc?

目前在docker容器中放入编译过后的jemalloc,在flink的配置文件加入如下配置

containerized.taskmanager.env.LD_PRELOAD: /opt/jemalloc/lib/libjemalloc.so
   containerized.taskmanager.env.MALLOC_CONF: 
prof:true,lg_prof_interval:25,lg_prof_sample:17,prof_prefix:/opt/state/jeprof.out

结果生成不到对应的heap文件, /opt/state挂载在宿主机的磁盘上,权限给了。

请问该如何操作才可以生产内存dump呢?


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: Flink SQL 关键字 user?

2020-07-06 Thread Leonard Xu
Hi,
是的,用`user`转义处理下,
完整的保留关键字参考[1]

Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/ 


> 在 2020年7月6日,17:06,王 outlook  写道:
> 
> "CREATE TABLE ods_foo (\n" +
> "id INT,\n" +
> "user ARRAY>\n" +
> ") WITH (



Re: Flink SQL 关键字 user?

2020-07-06 Thread Benchao Li
是的,user是关键字,关键字列表可以参考[1].

如果遇到关键字,可以使用 ` 来处理,比如:
CREATE TABLE `user` (...) WITH (...);

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/#reserved-keywords

王 outlook  于2020年7月6日周一 下午5:07写道:

> 我发现我创建表 字段名为 user会报错。 user是关键字吗,还是其他原因
>
>
> "CREATE TABLE ods_foo (\n" +
> "id INT,\n" +
> "user ARRAY>\n" +
> ") WITH (
>
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "user" at line 3, column 5.
> Was expecting one of:
>
>
>
>

-- 

Best,
Benchao Li


Flink SQL 关键字 user?

2020-07-06 Thread 王 outlook
我发现我创建表 字段名为 user会报错。 user是关键字吗,还是其他原因


"CREATE TABLE ods_foo (\n" +
"id INT,\n" +
"user ARRAY>\n" +
") WITH (

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "user" at line 3, column 5.
Was expecting one of:





Flink????????

2020-07-06 Thread Z-Z
Hi?? ?? 
Flink??checkpointcheckpoint??

How to ensure that job is restored from savepoint when using Flink SQL

2020-07-06 Thread shadowell


Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If the SQL logic changes (operator order 
changes), when the task is restored from savepoint, will it cause some of the 
operator states to be unable to be mapped back, resulting in state loss?


Thanks~
Jie Feng 
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制

Re: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 Thread Congxian Qiu
@chenkaibit 多谢你的回复~

Best,
Congxian


chenkaibit  于2020年7月6日周一 下午3:53写道:

> hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和
> <高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10
> 引入的 MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda
> 表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过
> NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。
>
>
> --
> Best, yuchuan
>
>
>
> 在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
> >@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
> >CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
> >
> >Best,
> >Congxian
> >
> >
> >陈凯  于2020年7月6日周一 上午9:53写道:
> >
> >>
> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> >> 我之前提了个jira 描述了这个问题
> >> https://issues.apache.org/jira/browse/FLINK-18196
> >>
> >> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
> >>
> >> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> >>
> >>
> >>
> >> -邮件原件-
> >> 发件人: zhisheng 
> >> 发送时间: 2020年7月5日 15:01
> >> 收件人: user-zh 
> >> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
> >>
> >> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
> >>
> >> Best!
> >> zhisheng
> >>
> >> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
> >>
> >> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
> >> >
> >> > Best,
> >> > Congxian
> >> >
> >> >
> >> > zhisheng  于2020年7月4日周六 下午12:27写道:
> >> >
> >> > > 我们也有遇到过这个异常,但是不是很常见
> >> > >
> >> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >> > >
> >> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> >> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> >> > > > Best,
> >> > > > Congxian
> >> > > >
> >> > > >
> >> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> >> > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> >> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> >> > > > > >
> >> > > > > >
> >> > > > > >| |
> >> > > > > >JasonLee
> >> > > > > >|
> >> > > > > >|
> >> > > > > >邮箱:17610775...@163.com
> >> > > > > >|
> >> > > > > >
> >> > > > > >Signature is customized by Netease Mail Master
> >> > > > > >
> >> > > > > >在2020年07月01日 20:43,程龙 写道:
> >> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> >> > > > > >
> >> > > > > >
> >> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
> >> operator
> >> > > > > Filter -> Map (2/8).
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> > > > >
> >> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> >> > > > > >   at
> >> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> >> > > > > >   at
> >> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> >> > > > > >   at java.lang.Thread.run(Thread.java:745)
> >> > > > > >Caused by: java.lang.NullPointerException
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> >> > > > > >   at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> >> > > > > >   at
> >> > > > >
> >> > 

?????? flink interval join????????????????

2020-07-06 Thread ????(Bob Hu)
??flink??




----
??:"Benchao Li"

Re:Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 Thread chenkaibit
hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和 
<高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10 引入的 
MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda 
表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过 
NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。




--

Best, yuchuan





在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
>@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
>CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
>
>Best,
>Congxian
>
>
>陈凯  于2020年7月6日周一 上午9:53写道:
>
>>
>> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
>> 我之前提了个jira 描述了这个问题
>> https://issues.apache.org/jira/browse/FLINK-18196
>>
>> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>>
>> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>>
>>
>>
>> -邮件原件-
>> 发件人: zhisheng 
>> 发送时间: 2020年7月5日 15:01
>> 收件人: user-zh 
>> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
>>
>> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
>>
>> Best!
>> zhisheng
>>
>> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
>>
>> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > zhisheng  于2020年7月4日周六 下午12:27写道:
>> >
>> > > 我们也有遇到过这个异常,但是不是很常见
>> > >
>> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
>> > >
>> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
>> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
>> > > > Best,
>> > > > Congxian
>> > > >
>> > > >
>> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
>> > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
>> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
>> > > > > >
>> > > > > >
>> > > > > >| |
>> > > > > >JasonLee
>> > > > > >|
>> > > > > >|
>> > > > > >邮箱:17610775...@163.com
>> > > > > >|
>> > > > > >
>> > > > > >Signature is customized by Netease Mail Master
>> > > > > >
>> > > > > >在2020年07月01日 20:43,程龙 写道:
>> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
>> > > > > >
>> > > > > >
>> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
>> operator
>> > > > > Filter -> Map (2/8).
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > > >
>> > >
>> >
>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > > >
>> > >
>> >
>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>> > > > > >   at
>> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>> > > > > >   at
>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>> > > > > >   at java.lang.Thread.run(Thread.java:745)
>> > > > > >Caused by: java.lang.NullPointerException
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> 

Re: HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

2020-07-06 Thread Benchao Li
Hi Jim,

This is a known issue[1], could you verify that if this issue meets your
requirements?

[1] https://issues.apache.org/jira/browse/FLINK-18002

Jim Chen  于2020年7月6日周一 下午1:28写道:

> Hi, everyone!
>
> When i use flink1.10 to define table, and i want to define the json array
> as the string type. But the query resutl is null when i execute the program.
> The detail code as follow:
>
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem2 {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> //bsEnv.registerFunction("explode3", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable3 (\n" +
> "action STRING\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',\n" +
> "'connector.version' = '0.11',\n" +
> "'connector.topic' = 'test_action',\n" +
> "'connector.startup-mode' = 'earliest-offset',\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> "'update-mode' = 'append',\n" +
> "'format.type' = 'json',\n" +
> //"'format.derive-schema' = 'true',\n" +
> "'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
> ")";
> System.out.println(ddlSource);
> bsEnv.sqlUpdate(ddlSource);
>
> Table table = bsEnv.sqlQuery("select * from actionTable3");
> //Table table = bsEnv.sqlQuery("select * from actionTable2,
> LATERAL TABLE(explode3(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print();// the result is null
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>


-- 

Best,
Benchao Li


Re: flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空

2020-07-06 Thread Benchao Li
Hi Jim,

这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?

[1] https://issues.apache.org/jira/browse/FLINK-18002

Jim Chen  于2020年7月6日周一 上午11:28写道:

> Hi,
> 可以通过以下步骤还原车祸现场:
> kafka topic: test_action
> kafka message:
>   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>
> 代码Problem2.java:
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  *
>  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
>  * 那么在eval方法接收到的就是Row[],
>  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>  *
>  * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
>  * 现在的问题,就是查询出来,都是空
>  *
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem2 {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> bsEnv.registerFunction("explode3", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable3 (\n" +
> "action STRING\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',\n" +
> "'connector.version' = '0.11',\n" +
> "'connector.topic' = 'test_action',\n" +
> "'connector.startup-mode' = 'earliest-offset',\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> "'update-mode' = 'append',\n" +
> "'format.type' = 'json',\n" +
> "'format.derive-schema' = 'false',\n" +
> "'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
> ")";
> System.out.println(ddlSource);
> bsEnv.sqlUpdate(ddlSource);
>
> Table table = bsEnv.sqlQuery("select * from actionTable3");
> //Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
> TABLE(explode3(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print();// 输出都是空
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>


-- 

Best,
Benchao Li


Re: Does savepoint reset the base for incremental checkpoint

2020-07-06 Thread Congxian Qiu
Hi

checkpoint base is only used in the incremental checkpoint, the answer for
the first question is checkpoint x.

After restoring from a savepoint, there is no base for the first checkpoint.

you can ref to the code[1][2] for more information.

[1]
https://github.com/apache/flink/blob/c14f9d2f9f6d6f2da3dc41fcef010e12405e25eb/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L416
[2]
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java

Best,
Congxian


Steven Wu  于2020年7月6日周一 上午11:46写道:

> In a slightly different variation of sequence (checkpoint x, savepoint y,
> redeploy/restart job from savepoint y, checkpoint x+1), checkpoint x+1
> builds the incremental diff on savepoint y, right?
>
> On Sun, Jul 5, 2020 at 8:08 PM Steven Wu  wrote:
>
>>
>> In this sequence of (checkpoint x, savepoint y, checkpoint x+1), does
>> checkpoint x+1 build the incremental diff based on checkpoint x or
>> savepoint y?
>>
>> Thanks,
>> Steven
>>
>


回复:rocksdb的block cache usage应该如何使用

2020-07-06 Thread SmileSmile
hi yun tang!

我在容器内加入了libjemalloc.so.2并且在配置中加上了
containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc.so.2"
containerized.master.env.MALLOC_CONF: 
"prof:true,lg_prof_interval:25,lg_prof_sample:17"

请问要如何可以得到内存文件?试着kill一个tm,找不到对应的heap文件。求助



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 19:13,Yun Tang 写道:
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


?????? ????????????????????????????

2020-07-06 Thread kcz
windowflink??




----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
Best,
Congxian


JasonLee <17610775...@163.com ??2020??7??4?? 8:29??

 


 | |
 JasonLee
 |
 |
 ??17610775...@163.com
 |

 Signature is customized by Netease Mail Master

 ??2020??07??03?? 14:02??18579099...@163.com ??

 
??ProcessWindowFunctionvalueState

 
1.??ProcessWindowFunction??process??1??trigger

 
process??valueState



 18579099...@163.com


Re: [Table API] how to configure a nested timestamp field

2020-07-06 Thread Dongwon Kim
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu  wrote:

> Hi, Kim
>
> The reason your attempts (2) and (3) failed is that the json format does
> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
> field and then use a computed column to extract TIMESTAMP field, you can
> also define the time attribute on TIMESTAMP filed for using time-based
> operations in Flink 1.10.1. But the computed column only support in pure
> DDL, the Table API lacks the support and should be aligned in 1.12 as I
> know.
> The DDL syntax  as following:
>
> create table test (
>   `type` STRING,
>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>timestampCol as
> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, '-MM-dd
> HH:mm:ss')), —computed column
>WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> )   with (
>   'connector' = '...',
>   'format' = 'json',
>   ...
> );
>
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>
>
> 在 2020年7月4日,21:21,Dongwon Kim  写道:
>
> Hi,
> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
> message looks like below.
>
>> {
>>"type":"Update",
>>"location":{
>>   "id":"123e4567-e89b-12d3-a456-42665234",
>>   "lastUpdateTime":1593866161436
>>}
>> }
>
>
> I wrote the following program just to see whether json messages are
> correctly parsed by Table API:
>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> envSettings);
>> tEnv
>>   .connect(
>> new Kafka()
>>   .version("universal")
>>   .topic(consumerTopic)
>>   .startFromLatest()
>>   .properties(consumerProperties)
>>   )
>>   .withFormat(new Json())
>>   .withSchema(new Schema().schema(
>> TableSchema.builder()
>>   .field("type", STRING())
>>   .field("location",
>> ROW(
>>   FIELD("id", STRING()),
>>   // (1)
>>   FIELD("lastUpdateTime", BIGINT())
>>   // (2)
>>   FIELD("lastUpdateTime", TIMESTAMP())
>>   // (3)
>>   FIELD("lastUpdateTime",
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>> ))
>>   .build()
>>   ))
>>   .createTemporaryTable("message");
>> tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>   .print();
>
>
> Note that I tried BIGINT(), TIMESTAMP(), and
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> (1) it works fine but later I can't use time-based operations like
> windowing.
>
> (2) it causes the following exception
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>> the 'location' field of the TableSource return type.
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>> at
>> 

Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 Thread Congxian Qiu
@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。

Best,
Congxian


陈凯  于2020年7月6日周一 上午9:53写道:

>
> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> 我之前提了个jira 描述了这个问题
> https://issues.apache.org/jira/browse/FLINK-18196
>
> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
>
> -邮件原件-
> 发件人: zhisheng 
> 发送时间: 2020年7月5日 15:01
> 收件人: user-zh 
> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
>
> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
>
> Best!
> zhisheng
>
> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
>
> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
> >
> > Best,
> > Congxian
> >
> >
> > zhisheng  于2020年7月4日周六 下午12:27写道:
> >
> > > 我们也有遇到过这个异常,但是不是很常见
> > >
> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> > >
> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > > > > >
> > > > > >
> > > > > >| |
> > > > > >JasonLee
> > > > > >|
> > > > > >|
> > > > > >邮箱:17610775...@163.com
> > > > > >|
> > > > > >
> > > > > >Signature is customized by Netease Mail Master
> > > > > >
> > > > > >在2020年07月01日 20:43,程龙 写道:
> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > > > > >
> > > > > >
> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
> operator
> > > > > Filter -> Map (2/8).
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > >
> > > >
> > >
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > >
> > > >
> > >
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > > > > >   at org.apache.flink.streaming.runtime.io
> > > > >
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > > > > >   at
> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > > > > >   at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > > > > >   at java.lang.Thread.run(Thread.java:745)
> > > > > >Caused by: java.lang.NullPointerException
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > > > > >   at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> > > > >
> > > >
> > >
> >
>