回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread 郑斌斌
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
 |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
[2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
[2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

 at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 13:12
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
郑斌斌  于2020年9月23日周三 下午12:29写道:
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
 单流跑的话,比较正常。
 JOB的内存是4G。版本1.11.1
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 10:50
 收件人:user-zh 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 Hi Tianwang,

 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
 `Math.max(leftRelativeSize, rightRelativeSize) +
 allowedLateness`,根据你的SQL,这个值应该是6h
 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
 2;`,在你的SQL来讲,就是3h,也就是说
 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

 希望这个可以解答你的疑惑~

 [1] https://issues.apache.org/jira/browse/FLINK-18996

 Tianwang Li  于2020年9月22日周二 下午8:26写道:

 > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
 >
 >
 > 【join】
 >
 > > SELECT `b`.`rowtime`,
 > > `a`.`c_id`,
 > > `b`.`openid`
 > > FROM `test_table_a` AS `a`
 > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
 > > AND `a`.`openid` = `b`.`openid`
 > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
 > > AND `a`.`rowtime` + INTERVAL '6' HOUR
 > >
 > >
 > 【window】
 

Re: [SQL] parse table name from sql statement

2020-09-22 Thread Harold.Miao
thx

silence  于2020年9月22日周二 上午11:54写道:

> 写过一个类似的可以参考一下
>
> private static List lookupSelectTable(SqlNode sqlNode) {
> List list = new ArrayList<>();
> if (sqlNode instanceof SqlSelect) {
> SqlNode from = ((SqlSelect) sqlNode).getFrom();
> list.addAll(lookupSelectTable(from));
> } else if (sqlNode instanceof SqlJoin) {
> SqlJoin sqlJoin = (SqlJoin) sqlNode;
> list.addAll(lookupSelectTable(sqlJoin.getLeft()));
> list.addAll(lookupSelectTable(sqlJoin.getRight()));
> } else if (sqlNode instanceof SqlBasicCall) {
> SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
> SqlOperator operator = sqlBasicCall.getOperator();
> if (SqlKind.AS.equals(operator.getKind())) {
>
> list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
> } else if (SqlKind.UNION.equals(operator.getKind())) {
> for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
> list.addAll(lookupSelectTable(operandSqlNode));
> }
> } else {
> throw new RuntimeException("operator " + operator.getKind()
> + " not support");
> }
> } else if (sqlNode instanceof SqlIdentifier) {
> list.add(((SqlIdentifier) sqlNode).getSimple());
> } else {
> throw new RuntimeException("operator " + sqlNode.getClass() + "
> not support");
> }
> return list;
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best Regards,
Harold Miao


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread Benchao Li
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。

郑斌斌  于2020年9月23日周三 下午12:29写道:

>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
> 单流跑的话,比较正常。
> JOB的内存是4G。版本1.11.1
> --
> 发件人:Benchao Li 
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh 
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

-- 

Best,
Benchao Li


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread Tianwang Li
Hi Benchao, 感谢你的回复

我使用的RocksDB,内存 overhead 太多了, 什么地方超用了那么多内存,好像不受控制了。
另外,我也测试过hop窗口,也是超用内存比较。没有使用增量checkpoint。


最后,我这边的interval join 是 inner join ,使用的 b 表的rowtime作为时间,没有观察到延迟数据的情况。

[image: image.png]




Benchao Li  于2020年9月23日周三 上午10:50写道:

> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
**
 tivanli
**


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread Danny Chan
应该是碰到节点 cycle 引用了,导致优化 rule 一直重复重复触发,可以将 debug 日志打开,看下是哪个 rule 
被频繁触发了,之前修过一个类似的问题[1],可以参考下

[1] https://issues.apache.org/jira/browse/CALCITE-3121

Best,
Danny Chan
在 2020年9月23日 +0800 AM10:23,jun su ,写道:
> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > > > blink planner 有这个问题吗?
> > > > >
> > > > > jun su  于2020年9月22日周二 下午3:27写道:
> > > > >
> > > > > > hi all,
> > > > > >
> > > > > > 环境: flink-1.9.2 flink table planner
> > > > > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > > > >
> > > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > > > > ---
> > > > > > 代码:
> > > > > >
> > > > > > fbTableEnv.registerTableSource("source",orcTableSource)
> > > > > >
> > > > > > val select = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > > > >
> > > > > > fbTableEnv.registerTable("selectTable",select)
> > > > > >
> > > > > > val t1 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > > > > where Auth_Roles like 'a%'")
> > > > > > fbTableEnv.registerTable("t1",t1)
> > > > > >
> > > > > > val t2 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > > > > > Target_UserSid= 'b'")
> > > > > > fbTableEnv.registerTable("t2",t2)
> > > > > >
> > > > > > val t3 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > > > > > Thread_ID= 'c'")
> > > > > > fbTableEnv.registerTable("t3",t3)
> > > > > >
> > > > > > val t4 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > > > > > access_path= 'd'")
> > > > > > fbTableEnv.registerTable("t4",t4)
> > > > > >
> > > > > > val t5 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > > > > > action= 'e'")
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Jun Su
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su


回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread 郑斌斌
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
单流跑的话,比较正常。
JOB的内存是4G。版本1.11.1
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 10:50
收件人:user-zh 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li



Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread godfrey he
例如 calc merge rule,还有calc,agg等其他相关rule,点比较散。得具体看

jun su  于2020年9月23日周三 上午10:22写道:

> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > >> blink planner 有这个问题吗?
> > > >>
> > > >> jun su  于2020年9月22日周二 下午3:27写道:
> > > >>
> > > >> > hi all,
> > > >> >
> > > >> > 环境: flink-1.9.2 flink table planner
> > > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > >> >
> > > >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > >> > ---
> > > >> > 代码:
> > > >> >
> > > >> > fbTableEnv.registerTableSource("source",orcTableSource)
> > > >> >
> > > >> > val select = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > >> >
> > > >> > fbTableEnv.registerTable("selectTable",select)
> > > >> >
> > > >> > val t1 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > >> > where Auth_Roles like 'a%'")
> > > >> > fbTableEnv.registerTable("t1",t1)
> > > >> >
> > > >> > val t2 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1
> where
> > > >> > Target_UserSid= 'b'")
> > > >> > fbTableEnv.registerTable("t2",t2)
> > > >> >
> > > >> > val t3 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2
> where
> > > >> > Thread_ID= 'c'")
> > > >> > fbTableEnv.registerTable("t3",t3)
> > > >> >
> > > >> > val t4 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3
> where
> > > >> > access_path= 'd'")
> > > >> > fbTableEnv.registerTable("t4",t4)
> > > >> >
> > > >> > val t5 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4
> where
> > > >> > action= 'e'")
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Best,
> > > >> > Jun Su
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>


Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-09-22 Thread Xintong Song
Hi Shubham,

Concerning FLINK-18712, thanks for the pointer. I was not aware of this
issue before. Running on Kubernetes or Yarn should not affect this issue. I
cannot tell whether this issue is the cause of your problem. The simplest
way to confirm this is probably just try the solution to see if that fixes
your problem.

Given that it could take weeks to reproduce your problem, I would suggest
to keep track of the native memory usage with jemalloc and jeprof. This
should provide direct information about which component is using extra
memory.

Thank you~

Xintong Song



On Tue, Sep 22, 2020 at 10:42 PM Shubham Kumar 
wrote:

> Hi Xintong,
>
> Thanks for your insights, they are really helpful.
>
> I understand now that it most certainly is a native memory issue rather
> than a heap memory issue and about not trusting Flink's Non-Heap metrics.
>
> I do believe that our structure of job is so simple that I couldn't find
> any use of mmap memory or any other straight forward native memory leak
> issue. That leads me to believing that it can be a rocksDB issue, although
> you do make a valid point about that there is extra 2GB in the yarn
> container which should account for RocksDB extra usage. I also saw this
> JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
> wondering if the same could happen on yarn containers and is related to my
> issue [1]. Let me know what you guys think about this.
>
> Also, I tried running the same job using FileSystemBackend (as a separate
> job) and it went fine with no container kills and native memory not rising
> over time, which hints further towards RocksDB being the culprit. My state
> size in the checkpoint is around 1GB (can probably even think of switching
> to FileSystemBackend for this job but still want to figure out the case for
> RocksDB). I am using incremental checkpoints in my main job which has
> RocksDB state backend, if that's relevant.
>
> I read about native memory tracking and probably go ahead and use Native
> Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
> update here.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18712
>
> Thanks
> Shubham
>
> On Mon, Sep 21, 2020 at 8:23 AM Xintong Song 
> wrote:
>
>> Hi Shubham,
>>
>> Java heap memory cannot cause a container memory exceeding. Heap memory
>> is strictly limited by the JVM `-Xmx` parameter. If the program does need
>> more memory than the limit, it will run into a heap space OOM, rather than
>> implicitly using more memory than the limit.
>>
>> Several reasons that might lead to container memory exceeding.
>> - RocksDB, whose memory controlling is based on estimation rather than
>> hard limit. This is one of the most common reasons for such memory
>> exceedings. However, usually the extra memory usage introduced by RocksDB,
>> if there's any, should not be too large. Given that your container size is
>> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
>> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
>> RocksDB state backend.
>> - Does your job use mmap memory? MMap memory, if used, is controlled by
>> the operating system, not Flink. Depending on your Yarn cgroup
>> configurations, some clusters would also count that as part of the
>> container memory consumption.
>> - Native memory leaks in user code dependencies and libraries could also
>> lead to container memory exceeding.
>>
>> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
>> practically helpless and misleading. The "Non-Heap" accounts for SOME of
>> the non-heap memory usage, but NOT ALL of them. The community is working on
>> a new set of metrics and Web UI for the task manager memory tuning.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar <
>> shubhamkumar1...@gmail.com> wrote:
>>
>>> Hey everyone,
>>>
>>> We had deployed a streaming job using Flink 1.10.1 one month back and
>>> now we are encountering a Yarn container killed due to memory issues very
>>> frequently. I am trying to figure out the root cause of this issue in order
>>> to fix it.
>>>
>>> We have a streaming job whose basic structure looks like this:
>>> - Read 6 kafka streams and combine stats from them (union) to form a
>>> single stream
>>> - stream.keyBy(MyKey)
>>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>>  .reduce(MyReduceFunction)
>>>  .addSink(new FlinkKafkaProducer011<>...);
>>>
>>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>>> one slot per task manager.
>>>
>>> So, a taskmanager process gets started with the following memory
>>> components as indicated in logs:
>>>
>>> TaskExecutor container... will be started on ... with
 TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
 134217728 bytes), frameworkOffHeapSize=128.000mb 

Flink Statefun Byte Ingress

2020-09-22 Thread Timothy Bess
Hi,

So most of the examples of "module.yaml" files I've seen focus on protobuf
ingress, but is there a way to just get bytes from Kafka? I want to
integrate this with the rest of my codebase which uses JSON, but don't want
to migrate to protobuf just yet. I'm not totally sure how it would work
since function arguments seem to be encoded as an Any type which is a
protobuf type string + some bytes, I guess the string would need to be some
made up constant value and I'd just grab the bytes? Honestly just using
bytes like is done with the state value might be a bit more flexible to
work with.

Thanks,

Tim


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread Benchao Li
Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread jun su
hi godfrey,
方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的

godfrey he  于2020年9月23日周三 上午10:09写道:

> Hi Jun,
>
> 可能是old planner缺少一些rule导致遇到了corner case,
> blink planner之前解过一些类似的案例。
>
> jun su  于2020年9月23日周三 上午9:53写道:
>
> > hi godfrey,
> >
> > 刚看了下, blink应该也会用hep , 上文说错了
> >
> > jun su  于2020年9月23日周三 上午9:19写道:
> >
> > > hi godfrey,
> > > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > >
> > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > >
> > >> blink planner 有这个问题吗?
> > >>
> > >> jun su  于2020年9月22日周二 下午3:27写道:
> > >>
> > >> > hi all,
> > >> >
> > >> > 环境: flink-1.9.2 flink table planner
> > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > >> >
> > >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > 导致进程OOM
> > >> > ---
> > >> > 代码:
> > >> >
> > >> > fbTableEnv.registerTableSource("source",orcTableSource)
> > >> >
> > >> > val select = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> ")
> > >> >
> > >> > fbTableEnv.registerTable("selectTable",select)
> > >> >
> > >> > val t1 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > selectTable
> > >> > where Auth_Roles like 'a%'")
> > >> > fbTableEnv.registerTable("t1",t1)
> > >> >
> > >> > val t2 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > >> > Target_UserSid= 'b'")
> > >> > fbTableEnv.registerTable("t2",t2)
> > >> >
> > >> > val t3 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > >> > Thread_ID= 'c'")
> > >> > fbTableEnv.registerTable("t3",t3)
> > >> >
> > >> > val t4 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > >> > access_path= 'd'")
> > >> > fbTableEnv.registerTable("t4",t4)
> > >> >
> > >> > val t5 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > >> > action= 'e'")
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > Best,
> > >> > Jun Su
> > >> >
> > >>
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


flink从mongdb中读取数据

2020-09-22 Thread GeGe
您好!


我刚开始接触flink,现在准备用flink读取mongdb中的数据。mongodb中每秒都会有一个新增数据,重写RichSourceFunction来连接读取mongodb中数据。但是flink不能像kafka一样,持续不断显示数据,是否mongodb不能持续显示数据呢?还是有一些额外设置?


十分感谢!


Best wishes,


Gege

Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread godfrey he
Hi Jun,

可能是old planner缺少一些rule导致遇到了corner case,
blink planner之前解过一些类似的案例。

jun su  于2020年9月23日周三 上午9:53写道:

> hi godfrey,
>
> 刚看了下, blink应该也会用hep , 上文说错了
>
> jun su  于2020年9月23日周三 上午9:19写道:
>
> > hi godfrey,
> > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> >
> > godfrey he  于2020年9月22日周二 下午8:58写道:
> >
> >> blink planner 有这个问题吗?
> >>
> >> jun su  于2020年9月22日周二 下午3:27写道:
> >>
> >> > hi all,
> >> >
> >> > 环境: flink-1.9.2 flink table planner
> >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> >> >
> >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> 导致进程OOM
> >> > ---
> >> > 代码:
> >> >
> >> > fbTableEnv.registerTableSource("source",orcTableSource)
> >> >
> >> > val select = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
> >> >
> >> > fbTableEnv.registerTable("selectTable",select)
> >> >
> >> > val t1 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> selectTable
> >> > where Auth_Roles like 'a%'")
> >> > fbTableEnv.registerTable("t1",t1)
> >> >
> >> > val t2 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> >> > Target_UserSid= 'b'")
> >> > fbTableEnv.registerTable("t2",t2)
> >> >
> >> > val t3 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> >> > Thread_ID= 'c'")
> >> > fbTableEnv.registerTable("t3",t3)
> >> >
> >> > val t4 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> >> > access_path= 'd'")
> >> > fbTableEnv.registerTable("t4",t4)
> >> >
> >> > val t5 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> >> > action= 'e'")
> >> >
> >> >
> >> >
> >> > --
> >> > Best,
> >> > Jun Su
> >> >
> >>
> >
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread jun su
hi godfrey,

刚看了下, blink应该也会用hep , 上文说错了

jun su  于2020年9月23日周三 上午9:19写道:

> hi godfrey,
> 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
>
> godfrey he  于2020年9月22日周二 下午8:58写道:
>
>> blink planner 有这个问题吗?
>>
>> jun su  于2020年9月22日周二 下午3:27写道:
>>
>> > hi all,
>> >
>> > 环境: flink-1.9.2 flink table planner
>> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>> >
>> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
>> > ---
>> > 代码:
>> >
>> > fbTableEnv.registerTableSource("source",orcTableSource)
>> >
>> > val select = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
>> >
>> > fbTableEnv.registerTable("selectTable",select)
>> >
>> > val t1 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
>> > where Auth_Roles like 'a%'")
>> > fbTableEnv.registerTable("t1",t1)
>> >
>> > val t2 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
>> > Target_UserSid= 'b'")
>> > fbTableEnv.registerTable("t2",t2)
>> >
>> > val t3 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
>> > Thread_ID= 'c'")
>> > fbTableEnv.registerTable("t3",t3)
>> >
>> > val t4 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
>> > access_path= 'd'")
>> > fbTableEnv.registerTable("t4",t4)
>> >
>> > val t5 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
>> > action= 'e'")
>> >
>> >
>> >
>> > --
>> > Best,
>> > Jun Su
>> >
>>
>
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread jun su
hi godfrey,
我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因

godfrey he  于2020年9月22日周二 下午8:58写道:

> blink planner 有这个问题吗?
>
> jun su  于2020年9月22日周二 下午3:27写道:
>
> > hi all,
> >
> > 环境: flink-1.9.2 flink table planner
> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> >
> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> > ---
> > 代码:
> >
> > fbTableEnv.registerTableSource("source",orcTableSource)
> >
> > val select = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
> >
> > fbTableEnv.registerTable("selectTable",select)
> >
> > val t1 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
> > where Auth_Roles like 'a%'")
> > fbTableEnv.registerTable("t1",t1)
> >
> > val t2 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > Target_UserSid= 'b'")
> > fbTableEnv.registerTable("t2",t2)
> >
> > val t3 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > Thread_ID= 'c'")
> > fbTableEnv.registerTable("t3",t3)
> >
> > val t4 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > access_path= 'd'")
> > fbTableEnv.registerTable("t4",t4)
> >
> > val t5 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > action= 'e'")
> >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


Re: Flink Table SQL and writing nested Avro files

2020-09-22 Thread Dan Hill
Nice!  I'll try that.  Thanks, Dawid!

On Mon, Sep 21, 2020 at 2:37 AM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> I think the best what I can suggest is this:
>
> SELECT
>
> ROW(left.field0, left.field1, left.field2, ...),
>
> ROW(right.field0, right.field1, right.field2, ...)
>
> FROM ...
>
> You will need to list all the fields manually, as SQL does not allow for
> asterisks in regular function calls.
>
> If you are willing to give the Table API a try you might workaround some
> of the manual work with the Column Function[1]
>
> Table join = t1.join(t2).where($("id1").isEqual($("id2")));
> join
> .select(
> row(withColumns(range(1, t1.getSchema().getFieldCount(,
> row(withColumns(range(
> t1.getSchema().getFieldCount() + 1,
> t1.getSchema().getFieldCount() +
> t2.getSchema().getFieldCount(
> )
> .executeInsert("flat_avro")
> .await();
>
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#column-functions
> On 18/09/2020 09:47, Dan Hill wrote:
>
> Hi!
>
> I want to join two tables and write the results to Avro where the left and
> right rows are nested in the avro output.  Is it possible to do this with
> the SQL interface?
>
> Thanks!
> - Dan
>
>  CREATE TABLE `flat_avro` (
>`left` ROW,
>`right` ROW) WITH (
>'connector' = 'filesystem',
>'path' = 's3p://blah/blah',
>'format' = 'avro');INSERT INTO `flat_avro` SELECT left.*, right.* FROM 
> `left` LEFT JOIN `right`ON `left`.`id` = `right`.`id`);
>
>


Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-22 Thread Dan Hill
Hi Yang!

The multiple "INSERT INTO" jobs all go to the same Flink cluster.  I'm
using this Helm chart
 (which
looks like the standalone option).  I deploy the job using a simple k8
Job.  Sounds like I should do this myself.  Thanks!

Thanks!
- Dan



On Tue, Sep 22, 2020 at 5:37 AM Yang Wang  wrote:

> Hi Dan,
>
> First, I want to get more information about your submission so that we
> could make the question clear.
>
> Are you using TableEnvironment to execute multiple "INSERT INTO" sentences
> and find that each one will
> be executed in a separated Flink cluster? It is really strange, and I want
> to know how your are deploying your
> Flink cluster on Kubernetes, via standalone[1] or native integration[2].
> If it is the former, I am afraid you need
> `kubectl` to start/stop your Flink application manually. If it is the
> latter, I think the Flink cluster will be destroyed
> automatically when the Flink job failed. Also all the SQL jobs will be
> executed in a shared Flink application.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>
>
> Best,
> Yang
>
> Dan Hill  于2020年9月21日周一 上午8:15写道:
>
>> I've read the following upgrade application page
>> .
>> This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
>> Just checking to see if this is the common practice or do people do this
>> from their client jars.
>>
>>
>>
>> On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:
>>
>>> I'm prototyping with Flink SQL.  I'm iterating on a client job with
>>> multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
>>> retries.  This creates multiple stream jobs with the same names.
>>>
>>> Is it up to clients to delete the existing jobs?  I see Flink CLI
>>> functions for this.  Do most people usually do this from inside their
>>> client jar or their wrapper code (e.g. Kubernetes job).
>>>
>>> - Dan
>>>
>>


Stateful Functions + ML model prediction

2020-09-22 Thread John Morrow
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to each of 
the prediction models and add the results for each one to the record so it's 
available for the enrichment step. The downside of this is that in the future 
I'm anticipating having more and more models, and not necessarily wanting to 
send each record to every model for prediction. e.g. I might have a business 
rule which says if the author of the text is X then get the sentiment (via the 
sentiment model) and update field Z, so it would be a waste of time doing that 
for all records.

I had a look at stateful functions. There's an example in the statefun.io 
overview which shows having a stateful function for doing a fraud model 
prediction based on if an account has had X number of frauds detected in the 
last 30 days, so the key for the state is an account number. In my case, these 
model predictions don't really have any state - they just take input and return 
a prediction, they're more like a stateless lambda function. Also, I was 
wondering if I implemented these as stateful functions would I be able to make 
them available to other Flink jobs within the cluster, as opposed to having 
them as individual RichAsyncFunctions defined within a single Flink job and 
only available to that. The last thing which made stateful functions sound good 
was that at the moment all my business rules happen to be orthogonal, but I can 
imagine in the future where I might want one rule to be based on another one, 
and whereas regular dataflows have to be an acyclic graph stateful functions 
could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.




Re: Better way to share large data across task managers

2020-09-22 Thread Kostas Kloudas
Hi Dongwon,

If you know the data in advance, you can always use the Yarn options
in [1] (e.g. the "yarn.ship-directories") to ship the directories with
the data you want only once to each Yarn container (i.e. TM) and then
write a udf which reads them in the open() method. This will allow the
data to be shipped only once per TM but then each of the tasks will
have its own copy in memory of course. By default the visibility of
the files that you ship is set to APPLICATION [2], if I am not
mistaken so if more than one TMs go to the same node, then you will
have even less copies shipped.

Does this help with your usecase?

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
[2] 
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html

On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
>
> Hi,
>
> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
> difference might be the size of the broadcasted data; the size is around 
> 150MB.
>
> I've launched 32 TMs by setting
> - taskmanager.numberOfTaskSlots : 6
> - parallelism of the non-broadcast side : 192
>
> Here's some questions:
> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> each TM can read the broadcasted data? I'm considering implementing a static 
> class for the non-broadcast side to directly load data only once on each 
> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
> on YARN, so each TM is only for a single job). However, I'd like to use Flink 
> native facilities if possible.
>
> The type of broadcasted data is Map with around 600K entries, so 
> every time the data is broadcasted a lot of GC is inevitable on each TM due 
> to the (de)serialization cost.
>
> Any advice would be much appreciated.
>
> Best,
>
> Dongwon
>
> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-22 Thread Claude M
Thanks for your responses.
1.  There were no job re-starts prior to the metaspace OEM.
2.  I tried increasing the CPU request and still encountered the problem.
Any configuration change I make to the job manager, whether it's in the
flink-conf.yaml or increasing the pod's CPU/memory request, results
with this problem.


On Tue, Sep 22, 2020 at 12:04 AM Xintong Song  wrote:

> Thanks for the input, Brain.
>
> This looks like what we are looking for. The issue is fixed in 1.10.3,
> which also matches this problem occurred in 1.10.2.
>
> Maybe Claude can further confirm it.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian  wrote:
>
>> Hi Xintong and Claude,
>>
>>
>>
>> In our internal tests, we also encounter these two issues and we spent
>> much time debugging them. There are two points I need to confirm if we
>> share the same problem.
>>
>>1. Your job is using default restart strategy, which is per-second
>>restart.
>>2. Your CPU resource on jobmanager might be small
>>
>>
>>
>> Here is some findings I want to share.
>>
>> ## Metaspace OOM
>>
>> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have
>> some job restarts, there will be some threads from the sourceFunction
>> hanging, cause the class loader cannot close. New restarts would load new
>> classes, then expand the metaspace, and finally OOM happens.
>>
>>
>>
>> ## Leader retrieving
>>
>> Constant restarts may be heavy for jobmanager, if JM CPU resources are
>> not enough, the thread for leader retrieving may be stuck.
>>
>>
>>
>> Best Regards,
>>
>> Brian
>>
>>
>>
>> *From:* Xintong Song 
>> *Sent:* Tuesday, September 22, 2020 10:16
>> *To:* Claude M; user
>> *Subject:* Re: metaspace out-of-memory & error while retrieving the
>> leader gateway
>>
>>
>>
>> ## Metaspace OOM
>>
>> As the error message already suggested, the metaspace OOM you encountered
>> is likely caused by a class loading leak. I think you are on the right
>> direction trying to look into the heap dump and find out where the leak
>> comes from. IIUC, after removing the ZK folder, you are now able to run
>> Flink with the heap dump options.
>>
>>
>>
>> The problem does not occur in previous versions because Flink starts to
>> set the metaspace limit since the 1.10 release. The class loading leak
>> might have already been there, but is never discovered. This could lead to
>> unpredictable stability and performance issues. That's why Flink updated
>> its memory model and explicitly set the metaspace limit in the 1.10 release.
>>
>>
>>
>> ## Leader retrieving
>>
>> The command looks good to me. If this problem happens only once, it could
>> be irrelevant to adding the options. If that does not block you from
>> getting the heap dump, we can look into it later.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>>
>>
>> On Mon, Sep 21, 2020 at 9:37 PM Claude M  wrote:
>>
>> Hi Xintong,
>>
>>
>>
>> Thanks for your reply.  Here is the command output w/ the java.opts:
>>
>>
>>
>> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC
>> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> -classpath
>> /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
>> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
>> --configDir /opt/flink/conf --executionMode cluster
>>
>>
>>
>> To answer your questions:
>>
>>- Correct, in order for the pod to start up, I have to remove the
>>flink app folder from zookeeper.  I only have to delete once after 
>> applying
>>the java.opts arguments.  It doesn't make sense though that I should have
>>to do this just from adding a parameter.
>>- I'm using the standalone deployment.
>>- I'm using job cluster mode.
>>
>> A higher priority issue I'm trying to solve is this metaspace out of
>> memory that is occuring in task managers.  This was not happening before I
>> upgraded to Flink 1.10.2.  Even after increasing the memory, I'm still
>> encountering the problem.  That is when I added the java.opts argument to
>> see if I can get more information about the problem.  That is when I ran
>> across the second issue w/ the job manager pod not starting up.
>>
>>
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> On Sun, Sep 20, 2020 at 10:23 PM Xintong Song 
>> wrote:
>>
>> Hi Claude,
>>
>>
>>
>> IIUC, in your case the leader retrieving problem is triggered by adding
>> the `java.opts`? Then could you try to find and post the complete command
>> for launching the JVM process? You can try log into the pod and execute `ps
>> -ef | grep `.

RichFunctions in Flink's Table / SQL API

2020-09-22 Thread Piyush Narang
Hi folks,

We were looking to cache some data using Flink’s MapState in one of our UDFs 
that are called by Flink SQL queries. I was trying to see if there’s a way to 
set up these state objects via the basic FunctionContext [1] we’re provided in 
the Table / SQL UserDefinedFunction class [2] but from what I can see it’s not 
possible. We just seem to have access to retrieve the metric group and access 
to the distributed cache / job params. Is there a way for us in Table / SQL 
UDFs to access Flink’s state and store data? Or is this something that isn’t 
supported / recommended? (If it helps we’re on Flink 1.9 and using the old SQL 
planner).

Our broader use-case is to enrich some data coming in via a Kafka stream by 
reading additional data in DynamoDB. We’d like to cache this across restarts to 
cut down on some of the DynamoDb traffic. (Ideally we’d like to move to 
temporal tables, but I think that requires a migration to Blink first?)

Thanks,

[1] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html
[2] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html

-- Piyush



Adaptive load balancing

2020-09-22 Thread Navneeth Krishnan
Hi All,

We are currently using flink in production and use keyBy for performing a
CPU intensive computation. There is a cache lookup for a set of keys and
since keyBy cannot guarantee the data is sent to a single node we are
basically replicating the cache on all nodes. This is causing more memory
problems for us and we would like to explore some options to mitigate the
current limitations.

Is there a way to group a set of keys and send to a set of nodes so that we
don't have to replicate the cache data on all nodes?

Has someone tried implementing hashing with adaptive load balancing so that
if a node is busy processing then the data can be routed effectively to
other nodes which are free.

Any suggestions are greatly appreciated.

Thanks


Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-09-22 Thread Shubham Kumar
Hi Xintong,

Thanks for your insights, they are really helpful.

I understand now that it most certainly is a native memory issue rather
than a heap memory issue and about not trusting Flink's Non-Heap metrics.

I do believe that our structure of job is so simple that I couldn't find
any use of mmap memory or any other straight forward native memory leak
issue. That leads me to believing that it can be a rocksDB issue, although
you do make a valid point about that there is extra 2GB in the yarn
container which should account for RocksDB extra usage. I also saw this
JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
wondering if the same could happen on yarn containers and is related to my
issue [1]. Let me know what you guys think about this.

Also, I tried running the same job using FileSystemBackend (as a separate
job) and it went fine with no container kills and native memory not rising
over time, which hints further towards RocksDB being the culprit. My state
size in the checkpoint is around 1GB (can probably even think of switching
to FileSystemBackend for this job but still want to figure out the case for
RocksDB). I am using incremental checkpoints in my main job which has
RocksDB state backend, if that's relevant.

I read about native memory tracking and probably go ahead and use Native
Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
update here.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham

On Mon, Sep 21, 2020 at 8:23 AM Xintong Song  wrote:

> Hi Shubham,
>
> Java heap memory cannot cause a container memory exceeding. Heap memory is
> strictly limited by the JVM `-Xmx` parameter. If the program does need more
> memory than the limit, it will run into a heap space OOM, rather than
> implicitly using more memory than the limit.
>
> Several reasons that might lead to container memory exceeding.
> - RocksDB, whose memory controlling is based on estimation rather than
> hard limit. This is one of the most common reasons for such memory
> exceedings. However, usually the extra memory usage introduced by RocksDB,
> if there's any, should not be too large. Given that your container size is
> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
> RocksDB state backend.
> - Does your job use mmap memory? MMap memory, if used, is controlled by
> the operating system, not Flink. Depending on your Yarn cgroup
> configurations, some clusters would also count that as part of the
> container memory consumption.
> - Native memory leaks in user code dependencies and libraries could also
> lead to container memory exceeding.
>
> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
> practically helpless and misleading. The "Non-Heap" accounts for SOME of
> the non-heap memory usage, but NOT ALL of them. The community is working on
> a new set of metrics and Web UI for the task manager memory tuning.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar 
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>  .reduce(MyReduceFunction)
>>  .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
>> kill error:
>>
>>> Association with remote system [akka.tcp://flink@...] has failed,
>>> address is now gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException:
>>> Connection refused: .../...:37679]
>>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>>> container_e193_1592804717489_149347_01_11 

Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread godfrey he
blink planner 有这个问题吗?

jun su  于2020年9月22日周二 下午3:27写道:

> hi all,
>
> 环境: flink-1.9.2 flink table planner
> 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>
>   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> ---
> 代码:
>
> fbTableEnv.registerTableSource("source",orcTableSource)
>
> val select = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
>
> fbTableEnv.registerTable("selectTable",select)
>
> val t1 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
> where Auth_Roles like 'a%'")
> fbTableEnv.registerTable("t1",t1)
>
> val t2 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> Target_UserSid= 'b'")
> fbTableEnv.registerTable("t2",t2)
>
> val t3 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> Thread_ID= 'c'")
> fbTableEnv.registerTable("t3",t3)
>
> val t4 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> access_path= 'd'")
> fbTableEnv.registerTable("t4",t4)
>
> val t5 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> action= 'e'")
>
>
>
> --
> Best,
> Jun Su
>


keyedstate TTL 清理状态如何触发

2020-09-22 Thread smq
大家好,现在有个疑问,TTL如果设成1min,那么是时间到了之后,该state自动清除吗

Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-22 Thread Yang Wang
Hi Dan,

First, I want to get more information about your submission so that we
could make the question clear.

Are you using TableEnvironment to execute multiple "INSERT INTO" sentences
and find that each one will
be executed in a separated Flink cluster? It is really strange, and I want
to know how your are deploying your
Flink cluster on Kubernetes, via standalone[1] or native integration[2]. If
it is the former, I am afraid you need
`kubectl` to start/stop your Flink application manually. If it is the
latter, I think the Flink cluster will be destroyed
automatically when the Flink job failed. Also all the SQL jobs will be
executed in a shared Flink application.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
[2].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html


Best,
Yang

Dan Hill  于2020年9月21日周一 上午8:15写道:

> I've read the following upgrade application page
> .
> This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
> Just checking to see if this is the common practice or do people do this
> from their client jars.
>
>
>
> On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:
>
>> I'm prototyping with Flink SQL.  I'm iterating on a client job with
>> multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
>> retries.  This creates multiple stream jobs with the same names.
>>
>> Is it up to clients to delete the existing jobs?  I see Flink CLI
>> functions for this.  Do most people usually do this from inside their
>> client jar or their wrapper code (e.g. Kubernetes job).
>>
>> - Dan
>>
>


[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread Tianwang Li
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。


【join】

> SELECT `b`.`rowtime`,
> `a`.`c_id`,
> `b`.`openid`
> FROM `test_table_a` AS `a`
> INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> AND `a`.`openid` = `b`.`openid`
> AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> AND `a`.`rowtime` + INTERVAL '6' HOUR
>
>
【window】

> SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `rowtime`,
> HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__windoow_start__`,
> HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__window_end__`,
> `c_id`,
> COUNT(`openid`) AS `cnt`
> FROM `test_table_in_6h`
> GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> `c_id`
>


我配置了Fink的内存是4G, 实际使用达到了6.8 G。
同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右

【配置】

> cat conf/flink-conf.yaml
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 6144m
> taskmanager.memory.process.size: 4g
> taskmanager.memory.jvm-overhead.min: 1024m
> taskmanager.memory.jvm-overhead.max: 2048m
> taskmanager.debug.memory.log-interval: 1
> env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
> -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>



-- 
**
 tivanli
**


Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题

2020-09-22 Thread Jark Wu
1. 你用的是什么版本呢?我在最新的版本上试了下,没有报错呢。试下最新版本?
2. {123123,ff=1, 123123,kk=2} 就是 MULTISET 打印出来的结构哈, 就是 {key1=cnt, key2=cnt}
其中 123123,ff 是 row#toString。

Best,
Jark

On Tue, 22 Sep 2020 at 19:28, Li,Qian(DXM,PB)  wrote:

> 你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么?
>
> create view uu AS select collect(userBankTime) as userBank ,name from (
> select  name, row(name,description) as userBankTime from tt_haha) group by
> name; // 创建视图,描述userBank类型是MULTITYPE
>
> Flink SQL> desc uu;
> root
>  |-- userBank: MULTISET NOT NULL>
> NOT NULL
>  |-- name: STRING
>
> CREATE TABLE user_log_sink_10 (
> > name STRING,
> > maps MULTISET>,
> > PRIMARY KEY (name) NOT ENFORCED
> > ) WITH (
> > 'connector' = 'elasticsearch-6',
> > 'hosts' = '',
> > 'index' = 'enriched_orders',
> > 'document-type' = 'user'
> > );
>
> insert into user_log_sink_10 select name, collect(userBankTime) as maps
> from (select name, row(name,description) as userBankTime from tt_haha)
> group by name;
>
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassCastException:
> org.apache.flink.table.types.logical.MultisetType cannot be cast to
> org.apache.flink.table.types.logical.MapType
>
> 还有select name, collect(userBankTime) as maps from (select name,
> row(name,description) as userBankTime from tt_haha) group by name 出来的结果是
> test123{test123,123123=1}
> 123123{123123,ff=1, 123123,kk=2}
> 这种的,在flink sql中是什么格式呢?
>
> ||
>
> 在 2020/9/21 下午3:05,“Jark Wu” 写入:
>
> 1. concat 是内置函数,可以直接用。
> 2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
> 3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
> 4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
> 5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> [2]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html
>
> On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) 
> wrote:
>
> > Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
> >
> > 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
> >
> > 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
> >
> >
> > 在 2020/9/21 下午1:42,“Jark Wu” 写入:
> >
> > 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
> >
> > select userId, collect(userBankTime)
> > from (
> >   select  userId, concat(userBankNo, '_', createTime) as
> userBankTime
> >   from aa as a left join bb as b where a.userId=b.userId
> > ) group by userId;
> >
> >
> > Best,
> > Jark
> >
> > On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <
> liq...@duxiaoman.com>
> > wrote:
> >
> > > 请问:
> > >
> > > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> > > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
> > >   表aa
> > >   id, userId
> > >   表 bb
> > >   userId,userBankNo,createTime
> > >
> > > select  * from aa as a left join bb as b where
> a.userId=b.userId
> > >
> > > 谢谢!
> > >
> > >
> >
> >
> >
>
>
>


Re: App gets stuck in Created State

2020-09-22 Thread Arpith P
All the job manager logs have been deleted from the cluster. I'll have to
work with the infra team to get it back, once I have it i'll post it here.

Arpith

On Mon, Sep 21, 2020 at 5:50 PM Zhu Zhu  wrote:

> Hi Arpith,
>
> All tasks in CREATED state indicates no task is scheduled yet. It is
> strange it a job gets stuck in this state.
> Is it possible that you share the job manager log so we can check what is
> happening there?
>
> Thanks,
> Zhu
>
> Arpith P  于2020年9月21日周一 下午3:52写道:
>
>> Hi,
>>
>> We have Flink 1.8.0 cluster deployed in Hadoop distributed mode, I often
>> see even though Hadoop has enough resources Flink sits in Created state.
>> We have 4 operators using 15 parallelism, 1 operator using 40 & 2 operators
>> using 10. At time of submission I'm passing taskmanager memory as 4Gb and
>> job manager memory as 2gb. and 2 slots This request should only take 20
>> containers and 40 Vcores. But I see Flink is overallocating resource of 65
>> containers and 129 Cores . I've attached snapshots for references.
>>
>> Right now I'm passing:  -yD
>> yarn.heartbeat.container-request-interval=1000 -yD
>> taskmanager.network.memory.fraction=0.045 -yD
>> taskmanager.memory.preallote=true.
>>
>> How do I control resource allocation?.
>>
>>


Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题

2020-09-22 Thread Li,Qian(DXM,PB)
你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么?

create view uu AS select collect(userBankTime) as userBank ,name from ( select  
name, row(name,description) as userBankTime from tt_haha) group by name; // 
创建视图,描述userBank类型是MULTITYPE

Flink SQL> desc uu;
root
 |-- userBank: MULTISET NOT NULL> NOT NULL
 |-- name: STRING

CREATE TABLE user_log_sink_10 (
> name STRING,
> maps MULTISET>,
> PRIMARY KEY (name) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-6',
> 'hosts' = '',
> 'index' = 'enriched_orders',
> 'document-type' = 'user'
> );

insert into user_log_sink_10 select name, collect(userBankTime) as maps from 
(select name, row(name,description) as userBankTime from tt_haha) group by name;

[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: org.apache.flink.table.types.logical.MultisetType 
cannot be cast to org.apache.flink.table.types.logical.MapType

还有select name, collect(userBankTime) as maps from (select name, 
row(name,description) as userBankTime from tt_haha) group by name 出来的结果是
test123{test123,123123=1}
123123{123123,ff=1, 123123,kk=2}
这种的,在flink sql中是什么格式呢?

||

在 2020/9/21 下午3:05,“Jark Wu” 写入:

1. concat 是内置函数,可以直接用。
2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].

Best,
Jark

[1]:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
[2]:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html

On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB)  wrote:

> Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
>
> 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
>
> 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
>
>
> 在 2020/9/21 下午1:42,“Jark Wu” 写入:
>
> 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
>
> select userId, collect(userBankTime)
> from (
>   select  userId, concat(userBankNo, '_', createTime) as userBankTime
>   from aa as a left join bb as b where a.userId=b.userId
> ) group by userId;
>
>
> Best,
> Jark
>
> On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) 
> wrote:
>
> > 请问:
> >
> > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
> >   表aa
> >   id, userId
> >   表 bb
> >   userId,userBankNo,createTime
> >
> > select  * from aa as a left join bb as b where a.userId=b.userId
> >
> > 谢谢!
> >
> >
>
>
>




flink on yarn容器异常退出

2020-09-22 Thread Dream-底限
hi
我正在使用flink1.11.1 on
yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗


Re: Zookeeper connection loss causing checkpoint corruption

2020-09-22 Thread Arpith P
I created a ticket with all my findings.
https://issues.apache.org/jira/browse/FLINK-19359.

Thanks,
Arpith

On Tue, Sep 22, 2020 at 12:16 PM Timo Walther  wrote:

> Hi Arpith,
>
> is there a JIRA ticket for this issue already? If not, it would be great
> if you can report it. This sounds like a critical priority issue to me.
>
> Thanks,
> Timo
>
> On 22.09.20 06:25, Arpith P wrote:
> > Hi Peter,
> >
> > I have recently had a similar issue where I could not load from the
> > checkpoints path. I found that whenever a corrupt checkpoint happens the
> > "_metadata" file will not be persisted, and I've a  program which tracks
> > if checkpoint location based on this strategy and updates DB with
> > location based on timestamp. To restore the latest checkpoint I'm
> > querying DB ordered by latest timestamp. Let me know if this is helpful,
> > I can share code for this if needed.
> >
> > Arpith
> >
> > On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann
> > mailto:no.westerm...@genesys.com>> wrote:
> >
> > I recently ran into an issue with our Flink cluster: A zookeeper
> > service deploy caused a temporary connection loss and triggered a
> > new jobmanager leader election. Leadership election was successful
> > and our Flink job restarted from the last checkpoint. 
> >
> > This checkpoint appears to have been taken while we los connection
> > to Zookeeper and ended up in a corrupted state so the Flink job kept
> > failing. Here’s the exception stack trace for that:
> >
> > 2020-09-18 01:10:57
> >
> > java.lang.Exception: Exception while creating
> > StreamOperatorStateContext.
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> >
> >   at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> >
> >   at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> >
> >   at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: org.apache.flink.util.FlinkException: Could not restore
> > keyed state backend for
> > KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
> > any of the 1 provided restore options.
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >
> >   ... 9 more
> >
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> > Caught unexpected exception.
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >
> >   ... 11 more
> >
> > Caused by: java.io.IOException: Error while opening RocksDB
> > instance.
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)
> >
> >   at
> >
>  
> 

答复: Flink-1.11.1 Kafka Table API BigInt 问题

2020-09-22 Thread 刘首维
Hi,

试一下java的BigInteger呢


发件人: nashcen <2415370...@qq.com>
发送时间: 2020年9月22日 16:29:41
收件人: user-zh@flink.apache.org
主题: Flink-1.11.1 Kafka Table API BigInt 问题

*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")

// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

// 2. 注册 KafkaSource 表
tableEnv.connect(new Kafka()
  .version("universal")
  .topic("ods_dcpoints_dev")
  .property("zookeeper.connect", "localhost:2181")
  .property("bootstrap.servers", "localhost:9092")
  //.property("group.id", "testGroup")
  .startFromEarliest()
)
  .withFormat(new Json())
  .withSchema(new Schema()
.field("assetSpecId", DataTypes.STRING())
.field("dcnum", DataTypes.STRING())
.field("monitorType", DataTypes.STRING())
.field("tagNo", DataTypes.STRING())
.field("updateTime", DataTypes.BIGINT())
.field("value", DataTypes.STRING())

  )
  .createTemporaryTable("kafka_source_table")

// 3. 查询转换
// 使用 sqlQuery
val selectKafkaTable: Table = tableEnv.sqlQuery(s"select
assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")

selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Rui Li
Hi Timo,

I believe the blocker for this feature is that we don't support dynamically
adding user jars/resources at the moment. We're able to read the path to
the function jar from Hive metastore, but we cannot load the jar after the
user session is started.

On Tue, Sep 22, 2020 at 3:43 PM Timo Walther  wrote:

> Hi Husky,
>
> I guess https://issues.apache.org/jira/browse/FLINK-14055 is what is
> needed to make this feature possible.
>
> @Rui: Do you know more about this issue and current limitations.
>
> Regards,
> Timo
>
>
> On 18.09.20 09:11, Husky Zeng wrote:
> > When we submit a job which use udf of hive , the job will dependent on
> udf's
> > jars and configuration files.
> >
> > We have already store udf's jars and configuration files in hive metadata
> > store,so we excpet that flink could get those files hdfs paths by
> > hive-connector,and get those files in hdfs by paths when it running.
> >
> > In this code, it seemed we have already get those udf resources's path in
> > FunctionInfo, but did't use it.
> >
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80
> >
> > We submit udf's  jars and configuration files with job to yarn by client
> now
> > ,and try to find a way to avoid submit udf's resources when we submit a
> > job.Is it possible?
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>

-- 
Cheers,
Rui Li


Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Husky Zeng
Hi Timo,

Thanks for your attention,As what I say in this comment, this feature can
surely solve our problem, but it seems that the workload is much larger than
the solution in my scenario. Our project urgently needs to solve the problem
of reusing hive UDF in hive metastore, so we are more inclined to develop a
fast solution. I want to hear some community advice.

https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927

Best Regards,
Husky Zeng



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Does Flink support such a feature currently?

2020-09-22 Thread Marta Paes Moreira
Hi, Roc.

*Note:* in the future, please send this type of questions to the user
mailing list instead (user@flink.apache.org)!

If I understand your question correctly, this is possible using the LIKE
clause and a registered catalog. There is currently no implementation for
the MySQL JDBC catalog, but this is in the roadmap [1,2].

Once you register a catalog, you could do:

CREATE TABLE mapping_table

WITH (

  ...

 )

LIKE full_path_to_source_table;
Again, as of Flink 1.11 this only works for Postgres, not yet MySQL. I'm
copying in Bowen as he might be able to give more information on the
roadmap.

Marta

[1] https://issues.apache.org/jira/browse/FLINK-15352
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-93%3A+JDBC+catalog+and+Postgres+catalog

On Tue, Sep 22, 2020 at 9:44 AM Roc Marshal  wrote:

> Hi, everyone!
>
>When using flink sql DDL to create a mysql mapping table, does
> flink support the automatic rendering of the target table schema if we put
> no column-names in `create table table_name_mapping2mysql () with (...)`?
> If this feature is not supported, is it necessary to consider improving it?
>
> Thank you.
> Best, Roc.


Flink-1.11.1 Kafka Table API BigInt 问题

2020-09-22 Thread nashcen
*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")

// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

// 2. 注册 KafkaSource 表
tableEnv.connect(new Kafka()
  .version("universal")
  .topic("ods_dcpoints_dev")
  .property("zookeeper.connect", "localhost:2181")
  .property("bootstrap.servers", "localhost:9092")
  //.property("group.id", "testGroup")
  .startFromEarliest()
)
  .withFormat(new Json())
  .withSchema(new Schema()
.field("assetSpecId", DataTypes.STRING())
.field("dcnum", DataTypes.STRING())
.field("monitorType", DataTypes.STRING())
.field("tagNo", DataTypes.STRING())
.field("updateTime", DataTypes.BIGINT())
.field("value", DataTypes.STRING())

  )
  .createTemporaryTable("kafka_source_table")

// 3. 查询转换
// 使用 sqlQuery
val selectKafkaTable: Table = tableEnv.sqlQuery(s"select 
assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")
   
selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-22 Thread chuyuan
好勒,这种方案已经成功了,非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink DynamoDB stream connector losing records

2020-09-22 Thread Jiawei Wu
Hi Ying and Danny,

Sorry for the late reply, I just got back from vacation.

Yes I'm running Flink in Kinesis Data Analytics with Flink 1.8, and
checkpoint is enabled. This fully managed solution limits my access to
Flink logs, so far I didn't get any logs related to throttle or fail over.
The reason why I suspect throttle is the root cause is because some AWS
lambda that connects to the same DynamoDB stream has higher throttle right
after Flink starts consuming the DynamoDB stream, in this case I believe
the throttle will also happen on Flink side. I'm actively working with AWS
support to try to find some logs on this.

At the same time, when you say 'in theory should not lose exactly-once
semantics', does that mean Flink will retry when throttle? I notice there
is a parameter "flink.shard.getrecords.maxretries" and it's default value
is 3. Will Flink skip this record when all retry attempts failed?

Thanks,
Jiawei



On Tue, Sep 15, 2020 at 4:38 PM Cranmer, Danny  wrote:

> Hi Jiawei,
>
>
>
> I agree that the offset management mechanism uses the same code as Kinesis
> Stream Consumer and in theory should not lose exactly-once semantics. As
> Ying is alluding to, if your application is restarted and you have
> snapshotting disabled in AWS there is a chance that records can be lost
> between runs. However, if you have snapshotting enabled then the
> application should continue consuming records from the last processed
> sequence number.
>
>
>
> I am happy to take a deeper look if you can provide more
> information/logs/code.
>
>
>
> Thanks,
>
>
>
> *From: *Ying Xu 
> *Date: *Monday, 14 September 2020 at 19:48
> *To: *Andrey Zagrebin 
> *Cc: *Jiawei Wu , user 
> *Subject: *RE: [EXTERNAL] Flink DynamoDB stream connector losing records
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi Jiawei:
>
>
>
> Sorry for the delayed reply.  When you mention certain records getting
> skipped, is it from the same run or across different runs.  Any more
> specific details on how/when records are lost?
>
>
>
> FlinkDynamoDBStreamsConsumer is built on top of FlinkKinesisConsumer ,
> with similar offset management mechanism.  In theory it shouldn't lose
> exactly-once semantics in the case of getting throttled.  We haven't run it
> in any AWS kinesis analytics environment though.
>
>
>
> Thanks.
>
>
>
>
>
> On Thu, Sep 10, 2020 at 7:51 AM Andrey Zagrebin 
> wrote:
>
> Generally speaking this should not be a problem for exactly-once but I am
> not familiar with the DynamoDB and its Flink connector.
>
> Did you observe any failover in Flink logs?
>
>
>
> On Thu, Sep 10, 2020 at 4:34 PM Jiawei Wu 
> wrote:
>
> And I suspect I have throttled by DynamoDB stream, I contacted AWS support
> but got no response except for increasing WCU and RCU.
>
>
>
> Is it possible that Flink will lose exactly-once semantics when throttled?
>
>
>
> On Thu, Sep 10, 2020 at 10:31 PM Jiawei Wu 
> wrote:
>
> Hi Andrey,
>
>
>
> Thanks for your suggestion, but I'm using Kinesis analytics application
> which supports only Flink 1.8
>
>
>
> Regards,
>
> Jiawei
>
>
>
> On Thu, Sep 10, 2020 at 10:13 PM Andrey Zagrebin 
> wrote:
>
> Hi Jiawei,
>
>
>
> Could you try Flink latest release 1.11?
> 1.8 will probably not get bugfix releases.
>
> I will cc Ying Xu who might have a better idea about the DinamoDB source.
>
>
>
> Best,
>
> Andrey
>
>
>
> On Thu, Sep 10, 2020 at 3:10 PM Jiawei Wu 
> wrote:
>
> Hi,
>
>
>
> I'm using AWS kinesis analytics application with Flink 1.8. I am using
> the FlinkDynamoDBStreamsConsumer to consume DynamoDB stream records. But
> recently I found my internal state is wrong.
>
>
>
> After I printed some logs I found some DynamoDB stream record are skipped
> and not consumed by Flink. May I know if someone encountered the same issue
> before? Or is it a known issue in Flink 1.8?
>
>
>
> Thanks,
>
> Jiawei
>
>


Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Timo Walther

Hi Husky,

I guess https://issues.apache.org/jira/browse/FLINK-14055 is what is 
needed to make this feature possible.


@Rui: Do you know more about this issue and current limitations.

Regards,
Timo


On 18.09.20 09:11, Husky Zeng wrote:

When we submit a job which use udf of hive , the job will dependent on udf's
jars and configuration files.

We have already store udf's jars and configuration files in hive metadata
store,so we excpet that flink could get those files hdfs paths by
hive-connector,and get those files in hdfs by paths when it running.

In this code, it seemed we have already get those udf resources's path in
FunctionInfo, but did't use it.

   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80


We submit udf's  jars and configuration files with job to yarn by client now
,and try to find a way to avoid submit udf's resources when we submit a
job.Is it possible?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





[ANNOUNCE] Weekly Community Update 2020/38

2020-09-22 Thread Konstantin Knauf
Dear community,

happy to share a brief community update for the past week. A lot of FLIP
votes are currently ongoing on the dev@ mailing list. I've covered this
FLIP previously, so skipping those this time. Besides that, a couple of
release related updates and again multiple new Committers.

Flink Development
==

* [releases] Apache Flink 1.12.2 was released. [1]

* [releases] The first release candidate for Stateful Functions 2.2.0 was
published and already cancelled :) A new release candidate will probably be
published today. [2]

* [releases] Robert has shared another update on blocker and build
instabilities for the upcoming release of Apache Flink 1.12. There are five
weeks left till feature freeze. [3]

* [releases] Chesnay started a discussion thread on releases flink-shaded
12.0 containing upgrades to some of Apache Flink's core dependencies. [4]

* [cep, sql] Kosma has started a discussion on supporting timeouts in
MATCH_RECOGNIZE, which would allow a pattern to fire/match in the absence
of an event. [5]

[1] https://flink.apache.org/news/2020/09/17/release-1.11.2.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-2-2-0-release-candidate-1-tp45032.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-12-Stale-blockers-and-build-instabilities-tp43477.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-flink-shaded-12-0-td44968.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Timed-out-patterns-handling-using-MATCH-RECOGNIZE-tp45001.html

Notable Bugs
==

Nothing notable came to my attention.

Events, Blog Posts, Misc
===

* Godfrey He, Igal Shilman and Yun Tang are now Apache Flink Committers.
Congratulations! [6,7,8]

* The first keynote of Flink Forward Global has been announced: "Real-Time
Metrics at Fortnite Scale" by Ricky Saltzer of Epic Games []

[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Godfrey-He-tp44830.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Igal-Shilman-tp44754p44865.html
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Yun-Tang-tp44777p44909.html
[9] https://twitter.com/FlinkForward/status/1306219099475902464

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread jun su
hi all,

环境: flink-1.9.2 flink table planner
现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM

  发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
---
代码:

fbTableEnv.registerTableSource("source",orcTableSource)

val select = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")

fbTableEnv.registerTable("selectTable",select)

val t1 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
where Auth_Roles like 'a%'")
fbTableEnv.registerTable("t1",t1)

val t2 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
Target_UserSid= 'b'")
fbTableEnv.registerTable("t2",t2)

val t3 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
Thread_ID= 'c'")
fbTableEnv.registerTable("t3",t3)

val t4 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
access_path= 'd'")
fbTableEnv.registerTable("t4",t4)

val t5 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
action= 'e'")



-- 
Best,
Jun Su


Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-22 Thread Dawid Wysakowicz
Hi Lian,

Thank you for sending the full code for the pojo. It clarified a lot!

I learnt that Avro introduced yet another mechanism for retrieving
conversions for logical types in Avro 1.9.x. I was not aware they create
a static SpecificData field with registered logical conversions if a
logical type is part of a union. That's why I did not understand the
parts of the you sent me where you are registering the logical types in
the MODEL$ field. The getConversion method is part of the
SpecificRecordBase class and is being populated by Avro compiler when a
logical type is a top level type. This bit should work just fine.

Unfortunately we do not support this "feature" of using the static
SpecificData field. So far we create the SpecificData manually in
AvroSerializer and Avro(De)SerializationSchema that is why the
conversions are not being picked up. I created a JIRA issue[1] and a
PR[2] to support it in Flink 1.12.

The only workaround I can see in earlier versions of Flink is to change
the AvroSerializer manually. You would need to do a similar thing as I
do in the linked PR.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19339

[2] https://github.com/apache/flink/pull/13450

On 21/09/2020 19:28, Lian Jiang wrote:
> Thanks guys. Given Flink 1.12 is not ready (e.g. not available in
> Maven repo), I need to stick to 1.11.
>
> Dawid,
>
> For the code throwing "java.lang.Long cannot be cast to
> java.time.Instant",
>
> The avro schema has:
> union {null, timestamp_ms } eventTime = null;
>
> The avro pojo does have the logical type conversion:
>   private static SpecificData MODEL$ = new SpecificData();
> static {
> MODEL$.addLogicalTypeConversion(new 
> org.apache.avro.data.TimeConversions.TimestampMillisConversion());
>   }
>
> I don't see SpecificRecord#getConversions() you mentioned in avro repo.
> The pojo code throws:
> public void put(int field$, java.lang.Object value$) {
>   switch (field$) {
>   case 3: eventTime = (java.time.Instant)value$; break; // throw here
>   }
>
> I will send the full avdl and pojo offline to you for a close look.
>
>
> Regards
> Lian
>
>
> On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek  > wrote:
>
> Hi All,
>
> Avro was finally bumped in
> https://issues.apache.org/jira/browse/FLINK-18192.
>
> The implementers didn't see
> https://issues.apache.org/jira/browse/FLINK-12532, but it is also
> updated now.
>
> Best,
> Aljoscha
>
> On 21.09.20 08:04, Arvid Heise wrote:
> > Hi Lian,
> >
> > we had a similar discussion on [1].
> >
> > TL;DR you are using Avro 1.9.x while Flink still bundles Avro
> 1.8 [2] until
> > Hive bumps it [3]. In the thread, I gave some options to avoid
> running into
> > the issue.
> > The easiest fix is to use Avro 1.8.2 all the way, but you may
> run into [4]
> > if your logical type is nullable (which is not necessary in most
> cases).
> >
> > Still, I think it's time for us to revise the decision to wait
> for Hive to
> > bump and rather upgrade independently. Avro was for a long time
> stuck on
> > 1.8 but the project gained traction again in the past two years.
> On the
> > other hand, Hive seems to be rather slow to respond to that and we
> > shouldn't have a slow moving component block us to support a
> fast moving
> > component if it's such apparent that users want it.
> > @Aljoscha Krettek  > could you please pick that topic up
> > and ping the respective maintainers?
> >
> > [1]
> >
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> > [2] https://issues.apache.org/jira/browse/FLINK-12532
> > [3] https://issues.apache.org/jira/browse/HIVE-21737
> > [4] https://issues.apache.org/jira/browse/AVRO-1891
> >
> > On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang
> mailto:jiangok2...@gmail.com>> wrote:
> >
> >> Thanks Dawid for proposing
> ConfluentRegistryDeserializationSchema. I am
> >> trying ConfluentRegistryAvroDeserializationSchema (if this is
> what you
> >> mean) but got "java.lang.Long cannot be cast to
> java.time.Instant". This
> >> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
> >>  Is there
> any progress
> >> for this JIRA? Thanks. Regards!
> >>
> >>
> >> Stacktrace:
> >> java.lang.ClassCastException: java.lang.Long cannot be cast to
> >> java.time.Instant
> >> at
> com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
> >> at
> org.apache.avro.generic.GenericData.setField(GenericData.java:795)
> >> at
> >>
> 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
> >> at
> >>
> 
> 

Re: hourly counter

2020-09-22 Thread Timo Walther

Hi Lian,

you are right that timers are not available in a ProcessWindowFunction 
but the state store can be accessed. So given that your window width is 
1 min, you could maintain an additional state value for counting the 
minutes and updating your counter once this value reached 60.


Otherwise, I would recommend to use a process function and implement the 
windowing logic yourself if it is a simple windowing operation.


Btw if you are using a Long counter, I would say that chances are low 
that it will overflow. Also, have you considered using Flink's metric 
system? it might make custom metric clients unnecessary.


I hope this helps.

Regards,
Timo


On 22.09.20 02:02, Lian Jiang wrote:

Hi,

I have a window function with a window width of 1 min. I want to have an 
hourly counter which is reset every hour so it never overflows. There 
are multiple ways but none of them is straightforward:


StatsDClient instance =new NonBlockingStatsDClientBuilder()

int count = 0;

void incr() {
metricClient.count("mycounter",1,"mytag");

   count++;

}

void reset() {
metricClient.count("mycounter",-count,"mytag");

count = 0;

}

As you can see, the code needs to maintain a "count" variable to reset 
mycounter.
Also since timer is not available in Window function, extra code is needed to 
reset mycounter every hour.
Is there an easier way for implementing hourly counter? Or it is not a concern 
that a counter will overflow?

Thanks

Lian







Re: Zookeeper connection loss causing checkpoint corruption

2020-09-22 Thread Timo Walther

Hi Arpith,

is there a JIRA ticket for this issue already? If not, it would be great 
if you can report it. This sounds like a critical priority issue to me.


Thanks,
Timo

On 22.09.20 06:25, Arpith P wrote:

Hi Peter,

I have recently had a similar issue where I could not load from the 
checkpoints path. I found that whenever a corrupt checkpoint happens the 
"_metadata" file will not be persisted, and I've a  program which tracks 
if checkpoint location based on this strategy and updates DB with 
location based on timestamp. To restore the latest checkpoint I'm 
querying DB ordered by latest timestamp. Let me know if this is helpful, 
I can share code for this if needed.


Arpith

On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann 
mailto:no.westerm...@genesys.com>> wrote:


I recently ran into an issue with our Flink cluster: A zookeeper
service deploy caused a temporary connection loss and triggered a
new jobmanager leader election. Leadership election was successful
and our Flink job restarted from the last checkpoint. 

This checkpoint appears to have been taken while we los connection
to Zookeeper and ended up in a corrupted state so the Flink job kept
failing. Here’s the exception stack trace for that:

2020-09-18 01:10:57

java.lang.Exception: Exception while creating
StreamOperatorStateContext.

  at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

  at

org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

  at

org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)

  at

org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)

  at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

  at

org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)

  at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)

  at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)

  at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)

  at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore
keyed state backend for
KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
any of the 1 provided restore options.

  at

org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

  at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

  at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

  ... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Caught unexpected exception.

  at

org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)

  at

org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

  at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

  at

org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

  at

org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

  ... 11 more

Caused by: java.io.IOException: Error while opening RocksDB
instance.

  at

org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)

  at

org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)

  at

org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)

  at

org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

  at

org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)

  

Re: FlinkKafkaConsumer问题

2020-09-22 Thread 赵一旦
这个问题和flink关系不大。Kafka本身就是这么个特点,指定group,如果是订阅方式,会是你想象的那样,分享消息。但,如果是通过assign方式指定了消费哪个分区,则不受到group中消费者共享消息的限制。

SmileSmile  于2020年9月5日周六 下午4:51写道:

> hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会?
>
> Best
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月04日 14:11,op 写道:
> 大概懂了 感谢
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> taochangl...@163.com;
> 发送时间:2020年9月4日(星期五) 中午11:54
> 收件人:"user-zh" acqua@gmail.com;
>
> 主题:Re: FlinkKafkaConsumer问题
>
>
>
>
> 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。
>
> 在 2020/9/4 10:34, Shuiqiang Chen 写道:
>
>  Hi,
>  为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的
> partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint
> 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么
> FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka
> 消费组管理者记录,Flink 无法维护这些信息。
> 
>  在 2020年9月4日,上午10:25,op <520075...@qq.com 写道:
> 
> 
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?nbsp;
> 
> 
>  --nbsp;原始邮件nbsp;--
> 
> 发件人:
> "user-zh"
>   发送时间:nbsp;2020年9月3日(星期四) 晚上6:09
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: FlinkKafkaConsumer问题
> 
> 
> 
>  Hi op,
> 
>  在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic
> 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit
> 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka
> 服务端的一个角色。
> 
>  另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id
> commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset
> 开始消费。
> 
>  gt; 在 2020年9月3日,下午3:03,op <520075...@qq.comgt; 写道:
>  gt;
>  gt; amp;nbsp; amp;nbsp; hi,amp;nbsp;
> amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,amp;nbsp; amp;nbsp;
> 这有两个相同代码的程序:
>  gt; //---
>  gt; val bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
>  gt; Env.setRestartStrategy(RestartStrategies.noRestart())
>  gt; val consumerProps = new Properties()
>  gt; consumerProps.put("bootstrap.servers", brokers)
>  gt; consumerProps.put("group.id", "test1234")
>  gt;
>  gt; val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  gt; Env.addSource(consumer).print()
>  gt;
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢


Re: Flink-1.11 Table API &符号 语法问题

2020-09-22 Thread nashcen
多谢,引入以下包解决了我的问题

import org.apache.flink.table.api._



--
Sent from: http://apache-flink.147419.n8.nabble.com/