回复:[flink-1.10.2] Blink SQL 超用内存严重
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来 java.lang.Exception: [2020-09-14 09:27:20.431]Container [pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container. Dump of the process-tree for container_e91_1597199405327_9343_01_000298 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 -Djobmanager.rpc.address=njdev-nn03.nj -Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c -Djobmanager.rpc.port=30246 -Drest.address=flink-node03 |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' -Djobmanager.rpc.address='flink-node03' -Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' -Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out 2> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143 [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) -- 发件人:Benchao Li 发送时间:2020年9月23日(星期三) 13:12 收件人:user-zh ; 郑斌斌 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的, 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。 郑斌斌 于2020年9月23日周三 下午12:29写道: 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。 单流跑的话,比较正常。 JOB的内存是4G。版本1.11.1 -- 发件人:Benchao Li 发送时间:2020年9月23日(星期三) 10:50 收件人:user-zh 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重 Hi Tianwang, 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 `Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness`,根据你的SQL,这个值应该是6h 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2;`,在你的SQL来讲,就是3h,也就是说 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] 希望这个可以解答你的疑惑~ [1] https://issues.apache.org/jira/browse/FLINK-18996 Tianwang Li 于2020年9月22日周二 下午8:26写道: > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > 【join】 > > > SELECT `b`.`rowtime`, > > `a`.`c_id`, > > `b`.`openid` > > FROM `test_table_a` AS `a` > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > AND `a`.`openid` = `b`.`openid` > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > 【window】
Re: [SQL] parse table name from sql statement
thx silence 于2020年9月22日周二 上午11:54写道: > 写过一个类似的可以参考一下 > > private static List lookupSelectTable(SqlNode sqlNode) { > List list = new ArrayList<>(); > if (sqlNode instanceof SqlSelect) { > SqlNode from = ((SqlSelect) sqlNode).getFrom(); > list.addAll(lookupSelectTable(from)); > } else if (sqlNode instanceof SqlJoin) { > SqlJoin sqlJoin = (SqlJoin) sqlNode; > list.addAll(lookupSelectTable(sqlJoin.getLeft())); > list.addAll(lookupSelectTable(sqlJoin.getRight())); > } else if (sqlNode instanceof SqlBasicCall) { > SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; > SqlOperator operator = sqlBasicCall.getOperator(); > if (SqlKind.AS.equals(operator.getKind())) { > > list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0])); > } else if (SqlKind.UNION.equals(operator.getKind())) { > for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) { > list.addAll(lookupSelectTable(operandSqlNode)); > } > } else { > throw new RuntimeException("operator " + operator.getKind() > + " not support"); > } > } else if (sqlNode instanceof SqlIdentifier) { > list.add(((SqlIdentifier) sqlNode).getSimple()); > } else { > throw new RuntimeException("operator " + sqlNode.getClass() + " > not support"); > } > return list; > } > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best Regards, Harold Miao
Re: [flink-1.10.2] Blink SQL 超用内存严重
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的, 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。 郑斌斌 于2020年9月23日周三 下午12:29写道: > 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN > KILL 。 > 单流跑的话,比较正常。 > JOB的内存是4G。版本1.11.1 > -- > 发件人:Benchao Li > 发送时间:2020年9月23日(星期三) 10:50 > 收件人:user-zh > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重 > > Hi Tianwang, > > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 > > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 > `Math.max(leftRelativeSize, rightRelativeSize) + > allowedLateness`,根据你的SQL,这个值应该是6h > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / > 2;`,在你的SQL来讲,就是3h,也就是说 > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] > > 希望这个可以解答你的疑惑~ > > [1] https://issues.apache.org/jira/browse/FLINK-18996 > > Tianwang Li 于2020年9月22日周二 下午8:26写道: > > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > > > > 【join】 > > > > > SELECT `b`.`rowtime`, > > > `a`.`c_id`, > > > `b`.`openid` > > > FROM `test_table_a` AS `a` > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > > AND `a`.`openid` = `b`.`openid` > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' > SECOND > > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > > > > 【window】 > > > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) > AS > > > `rowtime`, > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__windoow_start__`, > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__window_end__`, > > > `c_id`, > > > COUNT(`openid`) AS `cnt` > > > FROM `test_table_in_6h` > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > > `c_id` > > > > > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > > > 【配置】 > > > > > cat conf/flink-conf.yaml > > > jobmanager.rpc.address: flink-jobmanager > > > taskmanager.numberOfTaskSlots: 1 > > > blob.server.port: 6124 > > > jobmanager.rpc.port: 6123 > > > taskmanager.rpc.port: 6122 > > > jobmanager.heap.size: 6144m > > > taskmanager.memory.process.size: 4g > > > taskmanager.memory.jvm-overhead.min: 1024m > > > taskmanager.memory.jvm-overhead.max: 2048m > > > taskmanager.debug.memory.log-interval: 1 > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > > -XX:NumberOfGCLogFiles=10 > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > > > > > > -- > > ** > > tivanli > > ** > > > > > -- > > Best, > Benchao Li > > -- Best, Benchao Li
Re: [flink-1.10.2] Blink SQL 超用内存严重
Hi Benchao, 感谢你的回复 我使用的RocksDB,内存 overhead 太多了, 什么地方超用了那么多内存,好像不受控制了。 另外,我也测试过hop窗口,也是超用内存比较。没有使用增量checkpoint。 最后,我这边的interval join 是 inner join ,使用的 b 表的rowtime作为时间,没有观察到延迟数据的情况。 [image: image.png] Benchao Li 于2020年9月23日周三 上午10:50写道: > Hi Tianwang, > > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 > > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 > `Math.max(leftRelativeSize, rightRelativeSize) + > allowedLateness`,根据你的SQL,这个值应该是6h > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / > 2;`,在你的SQL来讲,就是3h,也就是说 > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] > > 希望这个可以解答你的疑惑~ > > [1] https://issues.apache.org/jira/browse/FLINK-18996 > > Tianwang Li 于2020年9月22日周二 下午8:26写道: > > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > > > > 【join】 > > > > > SELECT `b`.`rowtime`, > > > `a`.`c_id`, > > > `b`.`openid` > > > FROM `test_table_a` AS `a` > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > > AND `a`.`openid` = `b`.`openid` > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' > SECOND > > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > > > > 【window】 > > > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) > AS > > > `rowtime`, > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__windoow_start__`, > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__window_end__`, > > > `c_id`, > > > COUNT(`openid`) AS `cnt` > > > FROM `test_table_in_6h` > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > > `c_id` > > > > > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > > > 【配置】 > > > > > cat conf/flink-conf.yaml > > > jobmanager.rpc.address: flink-jobmanager > > > taskmanager.numberOfTaskSlots: 1 > > > blob.server.port: 6124 > > > jobmanager.rpc.port: 6123 > > > taskmanager.rpc.port: 6122 > > > jobmanager.heap.size: 6144m > > > taskmanager.memory.process.size: 4g > > > taskmanager.memory.jvm-overhead.min: 1024m > > > taskmanager.memory.jvm-overhead.max: 2048m > > > taskmanager.debug.memory.log-interval: 1 > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > > -XX:NumberOfGCLogFiles=10 > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > > > > > > -- > > ** > > tivanli > > ** > > > > > -- > > Best, > Benchao Li > -- ** tivanli **
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
应该是碰到节点 cycle 引用了,导致优化 rule 一直重复重复触发,可以将 debug 日志打开,看下是哪个 rule 被频繁触发了,之前修过一个类似的问题[1],可以参考下 [1] https://issues.apache.org/jira/browse/CALCITE-3121 Best, Danny Chan 在 2020年9月23日 +0800 AM10:23,jun su ,写道: > hi godfrey, > 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的 > > godfrey he 于2020年9月23日周三 上午10:09写道: > > > Hi Jun, > > > > 可能是old planner缺少一些rule导致遇到了corner case, > > blink planner之前解过一些类似的案例。 > > > > jun su 于2020年9月23日周三 上午9:53写道: > > > > > hi godfrey, > > > > > > 刚看了下, blink应该也会用hep , 上文说错了 > > > > > > jun su 于2020年9月23日周三 上午9:19写道: > > > > > > > hi godfrey, > > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, > > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 > > > > > > > > godfrey he 于2020年9月22日周二 下午8:58写道: > > > > > > > > > blink planner 有这个问题吗? > > > > > > > > > > jun su 于2020年9月22日周二 下午3:27写道: > > > > > > > > > > > hi all, > > > > > > > > > > > > 环境: flink-1.9.2 flink table planner > > > > > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > > > > > > > > > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, > > > 导致进程OOM > > > > > > --- > > > > > > 代码: > > > > > > > > > > > > fbTableEnv.registerTableSource("source",orcTableSource) > > > > > > > > > > > > val select = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source > > ") > > > > > > > > > > > > fbTableEnv.registerTable("selectTable",select) > > > > > > > > > > > > val t1 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from > > > selectTable > > > > > > where Auth_Roles like 'a%'") > > > > > > fbTableEnv.registerTable("t1",t1) > > > > > > > > > > > > val t2 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where > > > > > > Target_UserSid= 'b'") > > > > > > fbTableEnv.registerTable("t2",t2) > > > > > > > > > > > > val t3 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where > > > > > > Thread_ID= 'c'") > > > > > > fbTableEnv.registerTable("t3",t3) > > > > > > > > > > > > val t4 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where > > > > > > access_path= 'd'") > > > > > > fbTableEnv.registerTable("t4",t4) > > > > > > > > > > > > val t5 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where > > > > > > action= 'e'") > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > > > > Jun Su > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > > > Jun Su > > > > > > > > > > > > > -- > > > Best, > > > Jun Su > > > > > > > > -- > Best, > Jun Su
回复:[flink-1.10.2] Blink SQL 超用内存严重
我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。 单流跑的话,比较正常。 JOB的内存是4G。版本1.11.1 -- 发件人:Benchao Li 发送时间:2020年9月23日(星期三) 10:50 收件人:user-zh 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重 Hi Tianwang, 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 `Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness`,根据你的SQL,这个值应该是6h 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2;`,在你的SQL来讲,就是3h,也就是说 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] 希望这个可以解答你的疑惑~ [1] https://issues.apache.org/jira/browse/FLINK-18996 Tianwang Li 于2020年9月22日周二 下午8:26写道: > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > 【join】 > > > SELECT `b`.`rowtime`, > > `a`.`c_id`, > > `b`.`openid` > > FROM `test_table_a` AS `a` > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > AND `a`.`openid` = `b`.`openid` > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > 【window】 > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `rowtime`, > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `__windoow_start__`, > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `__window_end__`, > > `c_id`, > > COUNT(`openid`) AS `cnt` > > FROM `test_table_in_6h` > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > `c_id` > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > 【配置】 > > > cat conf/flink-conf.yaml > > jobmanager.rpc.address: flink-jobmanager > > taskmanager.numberOfTaskSlots: 1 > > blob.server.port: 6124 > > jobmanager.rpc.port: 6123 > > taskmanager.rpc.port: 6122 > > jobmanager.heap.size: 6144m > > taskmanager.memory.process.size: 4g > > taskmanager.memory.jvm-overhead.min: 1024m > > taskmanager.memory.jvm-overhead.max: 2048m > > taskmanager.debug.memory.log-interval: 1 > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > -XX:NumberOfGCLogFiles=10 > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > -- > ** > tivanli > ** > -- Best, Benchao Li
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
例如 calc merge rule,还有calc,agg等其他相关rule,点比较散。得具体看 jun su 于2020年9月23日周三 上午10:22写道: > hi godfrey, > 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的 > > godfrey he 于2020年9月23日周三 上午10:09写道: > > > Hi Jun, > > > > 可能是old planner缺少一些rule导致遇到了corner case, > > blink planner之前解过一些类似的案例。 > > > > jun su 于2020年9月23日周三 上午9:53写道: > > > > > hi godfrey, > > > > > > 刚看了下, blink应该也会用hep , 上文说错了 > > > > > > jun su 于2020年9月23日周三 上午9:19写道: > > > > > > > hi godfrey, > > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, > > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 > > > > > > > > godfrey he 于2020年9月22日周二 下午8:58写道: > > > > > > > >> blink planner 有这个问题吗? > > > >> > > > >> jun su 于2020年9月22日周二 下午3:27写道: > > > >> > > > >> > hi all, > > > >> > > > > >> > 环境: flink-1.9.2 flink table planner > > > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > > > >> > > > > >> > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, > > > 导致进程OOM > > > >> > --- > > > >> > 代码: > > > >> > > > > >> > fbTableEnv.registerTableSource("source",orcTableSource) > > > >> > > > > >> > val select = fbTableEnv.sqlQuery("select > > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source > > ") > > > >> > > > > >> > fbTableEnv.registerTable("selectTable",select) > > > >> > > > > >> > val t1 = fbTableEnv.sqlQuery("select > > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from > > > selectTable > > > >> > where Auth_Roles like 'a%'") > > > >> > fbTableEnv.registerTable("t1",t1) > > > >> > > > > >> > val t2 = fbTableEnv.sqlQuery("select > > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 > where > > > >> > Target_UserSid= 'b'") > > > >> > fbTableEnv.registerTable("t2",t2) > > > >> > > > > >> > val t3 = fbTableEnv.sqlQuery("select > > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 > where > > > >> > Thread_ID= 'c'") > > > >> > fbTableEnv.registerTable("t3",t3) > > > >> > > > > >> > val t4 = fbTableEnv.sqlQuery("select > > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 > where > > > >> > access_path= 'd'") > > > >> > fbTableEnv.registerTable("t4",t4) > > > >> > > > > >> > val t5 = fbTableEnv.sqlQuery("select > > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 > where > > > >> > action= 'e'") > > > >> > > > > >> > > > > >> > > > > >> > -- > > > >> > Best, > > > >> > Jun Su > > > >> > > > > >> > > > > > > > > > > > > -- > > > > Best, > > > > Jun Su > > > > > > > > > > > > > -- > > > Best, > > > Jun Su > > > > > > > > -- > Best, > Jun Su >
Re: [flink-1.10.2] Blink SQL 超用内存严重
Hi Tianwang, 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 `Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness`,根据你的SQL,这个值应该是6h 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2;`,在你的SQL来讲,就是3h,也就是说 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] 希望这个可以解答你的疑惑~ [1] https://issues.apache.org/jira/browse/FLINK-18996 Tianwang Li 于2020年9月22日周二 下午8:26写道: > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > 【join】 > > > SELECT `b`.`rowtime`, > > `a`.`c_id`, > > `b`.`openid` > > FROM `test_table_a` AS `a` > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > AND `a`.`openid` = `b`.`openid` > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > 【window】 > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `rowtime`, > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `__windoow_start__`, > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `__window_end__`, > > `c_id`, > > COUNT(`openid`) AS `cnt` > > FROM `test_table_in_6h` > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > `c_id` > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > 【配置】 > > > cat conf/flink-conf.yaml > > jobmanager.rpc.address: flink-jobmanager > > taskmanager.numberOfTaskSlots: 1 > > blob.server.port: 6124 > > jobmanager.rpc.port: 6123 > > taskmanager.rpc.port: 6122 > > jobmanager.heap.size: 6144m > > taskmanager.memory.process.size: 4g > > taskmanager.memory.jvm-overhead.min: 1024m > > taskmanager.memory.jvm-overhead.max: 2048m > > taskmanager.debug.memory.log-interval: 1 > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > -XX:NumberOfGCLogFiles=10 > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > -- > ** > tivanli > ** > -- Best, Benchao Li
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
hi godfrey, 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的 godfrey he 于2020年9月23日周三 上午10:09写道: > Hi Jun, > > 可能是old planner缺少一些rule导致遇到了corner case, > blink planner之前解过一些类似的案例。 > > jun su 于2020年9月23日周三 上午9:53写道: > > > hi godfrey, > > > > 刚看了下, blink应该也会用hep , 上文说错了 > > > > jun su 于2020年9月23日周三 上午9:19写道: > > > > > hi godfrey, > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 > > > > > > godfrey he 于2020年9月22日周二 下午8:58写道: > > > > > >> blink planner 有这个问题吗? > > >> > > >> jun su 于2020年9月22日周二 下午3:27写道: > > >> > > >> > hi all, > > >> > > > >> > 环境: flink-1.9.2 flink table planner > > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > > >> > > > >> > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, > > 导致进程OOM > > >> > --- > > >> > 代码: > > >> > > > >> > fbTableEnv.registerTableSource("source",orcTableSource) > > >> > > > >> > val select = fbTableEnv.sqlQuery("select > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source > ") > > >> > > > >> > fbTableEnv.registerTable("selectTable",select) > > >> > > > >> > val t1 = fbTableEnv.sqlQuery("select > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from > > selectTable > > >> > where Auth_Roles like 'a%'") > > >> > fbTableEnv.registerTable("t1",t1) > > >> > > > >> > val t2 = fbTableEnv.sqlQuery("select > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where > > >> > Target_UserSid= 'b'") > > >> > fbTableEnv.registerTable("t2",t2) > > >> > > > >> > val t3 = fbTableEnv.sqlQuery("select > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where > > >> > Thread_ID= 'c'") > > >> > fbTableEnv.registerTable("t3",t3) > > >> > > > >> > val t4 = fbTableEnv.sqlQuery("select > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where > > >> > access_path= 'd'") > > >> > fbTableEnv.registerTable("t4",t4) > > >> > > > >> > val t5 = fbTableEnv.sqlQuery("select > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where > > >> > action= 'e'") > > >> > > > >> > > > >> > > > >> > -- > > >> > Best, > > >> > Jun Su > > >> > > > >> > > > > > > > > > -- > > > Best, > > > Jun Su > > > > > > > > > -- > > Best, > > Jun Su > > > -- Best, Jun Su
flink从mongdb中读取数据
您好! 我刚开始接触flink,现在准备用flink读取mongdb中的数据。mongodb中每秒都会有一个新增数据,重写RichSourceFunction来连接读取mongodb中数据。但是flink不能像kafka一样,持续不断显示数据,是否mongodb不能持续显示数据呢?还是有一些额外设置? 十分感谢! Best wishes, Gege
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
Hi Jun, 可能是old planner缺少一些rule导致遇到了corner case, blink planner之前解过一些类似的案例。 jun su 于2020年9月23日周三 上午9:53写道: > hi godfrey, > > 刚看了下, blink应该也会用hep , 上文说错了 > > jun su 于2020年9月23日周三 上午9:19写道: > > > hi godfrey, > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 > > > > godfrey he 于2020年9月22日周二 下午8:58写道: > > > >> blink planner 有这个问题吗? > >> > >> jun su 于2020年9月22日周二 下午3:27写道: > >> > >> > hi all, > >> > > >> > 环境: flink-1.9.2 flink table planner > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > >> > > >> > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, > 导致进程OOM > >> > --- > >> > 代码: > >> > > >> > fbTableEnv.registerTableSource("source",orcTableSource) > >> > > >> > val select = fbTableEnv.sqlQuery("select > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ") > >> > > >> > fbTableEnv.registerTable("selectTable",select) > >> > > >> > val t1 = fbTableEnv.sqlQuery("select > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from > selectTable > >> > where Auth_Roles like 'a%'") > >> > fbTableEnv.registerTable("t1",t1) > >> > > >> > val t2 = fbTableEnv.sqlQuery("select > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where > >> > Target_UserSid= 'b'") > >> > fbTableEnv.registerTable("t2",t2) > >> > > >> > val t3 = fbTableEnv.sqlQuery("select > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where > >> > Thread_ID= 'c'") > >> > fbTableEnv.registerTable("t3",t3) > >> > > >> > val t4 = fbTableEnv.sqlQuery("select > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where > >> > access_path= 'd'") > >> > fbTableEnv.registerTable("t4",t4) > >> > > >> > val t5 = fbTableEnv.sqlQuery("select > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where > >> > action= 'e'") > >> > > >> > > >> > > >> > -- > >> > Best, > >> > Jun Su > >> > > >> > > > > > > -- > > Best, > > Jun Su > > > > > -- > Best, > Jun Su >
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
hi godfrey, 刚看了下, blink应该也会用hep , 上文说错了 jun su 于2020年9月23日周三 上午9:19写道: > hi godfrey, > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 > > godfrey he 于2020年9月22日周二 下午8:58写道: > >> blink planner 有这个问题吗? >> >> jun su 于2020年9月22日周二 下午3:27写道: >> >> > hi all, >> > >> > 环境: flink-1.9.2 flink table planner >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM >> > >> > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM >> > --- >> > 代码: >> > >> > fbTableEnv.registerTableSource("source",orcTableSource) >> > >> > val select = fbTableEnv.sqlQuery("select >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ") >> > >> > fbTableEnv.registerTable("selectTable",select) >> > >> > val t1 = fbTableEnv.sqlQuery("select >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable >> > where Auth_Roles like 'a%'") >> > fbTableEnv.registerTable("t1",t1) >> > >> > val t2 = fbTableEnv.sqlQuery("select >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where >> > Target_UserSid= 'b'") >> > fbTableEnv.registerTable("t2",t2) >> > >> > val t3 = fbTableEnv.sqlQuery("select >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where >> > Thread_ID= 'c'") >> > fbTableEnv.registerTable("t3",t3) >> > >> > val t4 = fbTableEnv.sqlQuery("select >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where >> > access_path= 'd'") >> > fbTableEnv.registerTable("t4",t4) >> > >> > val t5 = fbTableEnv.sqlQuery("select >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where >> > action= 'e'") >> > >> > >> > >> > -- >> > Best, >> > Jun Su >> > >> > > > -- > Best, > Jun Su > -- Best, Jun Su
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
hi godfrey, 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 godfrey he 于2020年9月22日周二 下午8:58写道: > blink planner 有这个问题吗? > > jun su 于2020年9月22日周二 下午3:27写道: > > > hi all, > > > > 环境: flink-1.9.2 flink table planner > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM > > --- > > 代码: > > > > fbTableEnv.registerTableSource("source",orcTableSource) > > > > val select = fbTableEnv.sqlQuery("select > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ") > > > > fbTableEnv.registerTable("selectTable",select) > > > > val t1 = fbTableEnv.sqlQuery("select > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable > > where Auth_Roles like 'a%'") > > fbTableEnv.registerTable("t1",t1) > > > > val t2 = fbTableEnv.sqlQuery("select > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where > > Target_UserSid= 'b'") > > fbTableEnv.registerTable("t2",t2) > > > > val t3 = fbTableEnv.sqlQuery("select > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where > > Thread_ID= 'c'") > > fbTableEnv.registerTable("t3",t3) > > > > val t4 = fbTableEnv.sqlQuery("select > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where > > access_path= 'd'") > > fbTableEnv.registerTable("t4",t4) > > > > val t5 = fbTableEnv.sqlQuery("select > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where > > action= 'e'") > > > > > > > > -- > > Best, > > Jun Su > > > -- Best, Jun Su
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
blink planner 有这个问题吗? jun su 于2020年9月22日周二 下午3:27写道: > hi all, > > 环境: flink-1.9.2 flink table planner > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM > --- > 代码: > > fbTableEnv.registerTableSource("source",orcTableSource) > > val select = fbTableEnv.sqlQuery("select > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ") > > fbTableEnv.registerTable("selectTable",select) > > val t1 = fbTableEnv.sqlQuery("select > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable > where Auth_Roles like 'a%'") > fbTableEnv.registerTable("t1",t1) > > val t2 = fbTableEnv.sqlQuery("select > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where > Target_UserSid= 'b'") > fbTableEnv.registerTable("t2",t2) > > val t3 = fbTableEnv.sqlQuery("select > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where > Thread_ID= 'c'") > fbTableEnv.registerTable("t3",t3) > > val t4 = fbTableEnv.sqlQuery("select > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where > access_path= 'd'") > fbTableEnv.registerTable("t4",t4) > > val t5 = fbTableEnv.sqlQuery("select > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where > action= 'e'") > > > > -- > Best, > Jun Su >
keyedstate TTL 清理状态如何触发
大家好,现在有个疑问,TTL如果设成1min,那么是时间到了之后,该state自动清除吗
[flink-1.10.2] Blink SQL 超用内存严重
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 【join】 > SELECT `b`.`rowtime`, > `a`.`c_id`, > `b`.`openid` > FROM `test_table_a` AS `a` > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > AND `a`.`openid` = `b`.`openid` > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND > AND `a`.`rowtime` + INTERVAL '6' HOUR > > 【window】 > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > `rowtime`, > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > `__windoow_start__`, > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > `__window_end__`, > `c_id`, > COUNT(`openid`) AS `cnt` > FROM `test_table_in_6h` > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > `c_id` > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 【配置】 > cat conf/flink-conf.yaml > jobmanager.rpc.address: flink-jobmanager > taskmanager.numberOfTaskSlots: 1 > blob.server.port: 6124 > jobmanager.rpc.port: 6123 > taskmanager.rpc.port: 6122 > jobmanager.heap.size: 6144m > taskmanager.memory.process.size: 4g > taskmanager.memory.jvm-overhead.min: 1024m > taskmanager.memory.jvm-overhead.max: 2048m > taskmanager.debug.memory.log-interval: 1 > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > -- ** tivanli **
Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题
1. 你用的是什么版本呢?我在最新的版本上试了下,没有报错呢。试下最新版本? 2. {123123,ff=1, 123123,kk=2} 就是 MULTISET 打印出来的结构哈, 就是 {key1=cnt, key2=cnt} 其中 123123,ff 是 row#toString。 Best, Jark On Tue, 22 Sep 2020 at 19:28, Li,Qian(DXM,PB) wrote: > 你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么? > > create view uu AS select collect(userBankTime) as userBank ,name from ( > select name, row(name,description) as userBankTime from tt_haha) group by > name; // 创建视图,描述userBank类型是MULTITYPE > > Flink SQL> desc uu; > root > |-- userBank: MULTISET NOT NULL> > NOT NULL > |-- name: STRING > > CREATE TABLE user_log_sink_10 ( > > name STRING, > > maps MULTISET>, > > PRIMARY KEY (name) NOT ENFORCED > > ) WITH ( > > 'connector' = 'elasticsearch-6', > > 'hosts' = '', > > 'index' = 'enriched_orders', > > 'document-type' = 'user' > > ); > > insert into user_log_sink_10 select name, collect(userBankTime) as maps > from (select name, row(name,description) as userBankTime from tt_haha) > group by name; > > [INFO] Submitting SQL update statement to the cluster... > [ERROR] Could not execute SQL statement. Reason: > java.lang.ClassCastException: > org.apache.flink.table.types.logical.MultisetType cannot be cast to > org.apache.flink.table.types.logical.MapType > > 还有select name, collect(userBankTime) as maps from (select name, > row(name,description) as userBankTime from tt_haha) group by name 出来的结果是 > test123{test123,123123=1} > 123123{123123,ff=1, 123123,kk=2} > 这种的,在flink sql中是什么格式呢? > > || > > 在 2020/9/21 下午3:05,“Jark Wu” 写入: > > 1. concat 是内置函数,可以直接用。 > 2. 内置函数中没有 group_concat,不过有个类似功能的 listagg > 3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1] > 4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。 > 5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2]. > > Best, > Jark > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html > [2]: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html > > On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) > wrote: > > > Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b) > > > > 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了, > > > > 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么? > > > > > > 在 2020/9/21 下午1:42,“Jark Wu” 写入: > > > > 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下: > > > > select userId, collect(userBankTime) > > from ( > > select userId, concat(userBankNo, '_', createTime) as > userBankTime > > from aa as a left join bb as b where a.userId=b.userId > > ) group by userId; > > > > > > Best, > > Jark > > > > On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) < > liq...@duxiaoman.com> > > wrote: > > > > > 请问: > > > > > > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系 > > > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢? > > > 表aa > > > id, userId > > > 表 bb > > > userId,userBankNo,createTime > > > > > > select * from aa as a left join bb as b where > a.userId=b.userId > > > > > > 谢谢! > > > > > > > > > > > > > > >
Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题
你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么? create view uu AS select collect(userBankTime) as userBank ,name from ( select name, row(name,description) as userBankTime from tt_haha) group by name; // 创建视图,描述userBank类型是MULTITYPE Flink SQL> desc uu; root |-- userBank: MULTISET NOT NULL> NOT NULL |-- name: STRING CREATE TABLE user_log_sink_10 ( > name STRING, > maps MULTISET>, > PRIMARY KEY (name) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-6', > 'hosts' = '', > 'index' = 'enriched_orders', > 'document-type' = 'user' > ); insert into user_log_sink_10 select name, collect(userBankTime) as maps from (select name, row(name,description) as userBankTime from tt_haha) group by name; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: java.lang.ClassCastException: org.apache.flink.table.types.logical.MultisetType cannot be cast to org.apache.flink.table.types.logical.MapType 还有select name, collect(userBankTime) as maps from (select name, row(name,description) as userBankTime from tt_haha) group by name 出来的结果是 test123{test123,123123=1} 123123{123123,ff=1, 123123,kk=2} 这种的,在flink sql中是什么格式呢? || 在 2020/9/21 下午3:05,“Jark Wu” 写入: 1. concat 是内置函数,可以直接用。 2. 内置函数中没有 group_concat,不过有个类似功能的 listagg 3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1] 4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。 5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2]. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) wrote: > Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b) > > 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了, > > 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么? > > > 在 2020/9/21 下午1:42,“Jark Wu” 写入: > > 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下: > > select userId, collect(userBankTime) > from ( > select userId, concat(userBankNo, '_', createTime) as userBankTime > from aa as a left join bb as b where a.userId=b.userId > ) group by userId; > > > Best, > Jark > > On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) > wrote: > > > 请问: > > > > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系 > > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢? > > 表aa > > id, userId > > 表 bb > > userId,userBankNo,createTime > > > > select * from aa as a left join bb as b where a.userId=b.userId > > > > 谢谢! > > > > > > >
flink on yarn容器异常退出
hi 我正在使用flink1.11.1 on yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗
答复: Flink-1.11.1 Kafka Table API BigInt 问题
Hi, 试一下java的BigInteger呢 发件人: nashcen <2415370...@qq.com> 发送时间: 2020年9月22日 16:29:41 收件人: user-zh@flink.apache.org 主题: Flink-1.11.1 Kafka Table API BigInt 问题 *我的代码如下* 其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。 package com.athub.dcpoints.scala.connector.table.hive import com.athub.dcpoints.java.model.KafkaDcpoints import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment} import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.descriptors._ /** * @author Nash Cen * @date 2020/9/22 10:33 * @version 1.0 */ object KafkaFlinkHiveScalaApp { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hdfs") // 1. 获取执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv: TableEnvironment = StreamTableEnvironment.create(env,settings) // 2. 注册 KafkaSource 表 tableEnv.connect(new Kafka() .version("universal") .topic("ods_dcpoints_dev") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") //.property("group.id", "testGroup") .startFromEarliest() ) .withFormat(new Json()) .withSchema(new Schema() .field("assetSpecId", DataTypes.STRING()) .field("dcnum", DataTypes.STRING()) .field("monitorType", DataTypes.STRING()) .field("tagNo", DataTypes.STRING()) .field("updateTime", DataTypes.BIGINT()) .field("value", DataTypes.STRING()) ) .createTemporaryTable("kafka_source_table") // 3. 查询转换 // 使用 sqlQuery val selectKafkaTable: Table = tableEnv.sqlQuery(s"select assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table") selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka") env.execute("KafkaFlinkHiveScalaApp") } } *运行时报错信息如下:* [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.math.BigInt does not contain a setter for field bigInteger [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class scala.math.BigInt cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink-1.11.1 Kafka Table API BigInt 问题
*我的代码如下* 其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。 package com.athub.dcpoints.scala.connector.table.hive import com.athub.dcpoints.java.model.KafkaDcpoints import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment} import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.descriptors._ /** * @author Nash Cen * @date 2020/9/22 10:33 * @version 1.0 */ object KafkaFlinkHiveScalaApp { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hdfs") // 1. 获取执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv: TableEnvironment = StreamTableEnvironment.create(env,settings) // 2. 注册 KafkaSource 表 tableEnv.connect(new Kafka() .version("universal") .topic("ods_dcpoints_dev") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") //.property("group.id", "testGroup") .startFromEarliest() ) .withFormat(new Json()) .withSchema(new Schema() .field("assetSpecId", DataTypes.STRING()) .field("dcnum", DataTypes.STRING()) .field("monitorType", DataTypes.STRING()) .field("tagNo", DataTypes.STRING()) .field("updateTime", DataTypes.BIGINT()) .field("value", DataTypes.STRING()) ) .createTemporaryTable("kafka_source_table") // 3. 查询转换 // 使用 sqlQuery val selectKafkaTable: Table = tableEnv.sqlQuery(s"select assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table") selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka") env.execute("KafkaFlinkHiveScalaApp") } } *运行时报错信息如下:* [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.math.BigInt does not contain a setter for field bigInteger [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class scala.math.BigInt cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
好勒,这种方案已经成功了,非常感谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Calcite在嵌套多层包含where条件的sql语句时优化器OOM
hi all, 环境: flink-1.9.2 flink table planner 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM --- 代码: fbTableEnv.registerTableSource("source",orcTableSource) val select = fbTableEnv.sqlQuery("select Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ") fbTableEnv.registerTable("selectTable",select) val t1 = fbTableEnv.sqlQuery("select Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable where Auth_Roles like 'a%'") fbTableEnv.registerTable("t1",t1) val t2 = fbTableEnv.sqlQuery("select Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where Target_UserSid= 'b'") fbTableEnv.registerTable("t2",t2) val t3 = fbTableEnv.sqlQuery("select Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where Thread_ID= 'c'") fbTableEnv.registerTable("t3",t3) val t4 = fbTableEnv.sqlQuery("select Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where access_path= 'd'") fbTableEnv.registerTable("t4",t4) val t5 = fbTableEnv.sqlQuery("select Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where action= 'e'") -- Best, Jun Su
Re: FlinkKafkaConsumer问题
这个问题和flink关系不大。Kafka本身就是这么个特点,指定group,如果是订阅方式,会是你想象的那样,分享消息。但,如果是通过assign方式指定了消费哪个分区,则不受到group中消费者共享消息的限制。 SmileSmile 于2020年9月5日周六 下午4:51写道: > hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会? > > Best > > > > > | | > a511955993 > | > | > 邮箱:a511955...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年09月04日 14:11,op 写道: > 大概懂了 感谢 > > > > > --原始邮件-- > 发件人: > "user-zh" > < > taochangl...@163.com; > 发送时间:2020年9月4日(星期五) 中午11:54 > 收件人:"user-zh" acqua@gmail.com; > > 主题:Re: FlinkKafkaConsumer问题 > > > > > 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 > > 在 2020/9/4 10:34, Shuiqiang Chen 写道: > > Hi, > 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 > partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint > 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 > FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka > 消费组管理者记录,Flink 无法维护这些信息。 > > 在 2020年9月4日,上午10:25,op <520075...@qq.com 写道: > > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?nbsp; > > > --nbsp;原始邮件nbsp;-- > > 发件人: > "user-zh" > 发送时间:nbsp;2020年9月3日(星期四) 晚上6:09 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: FlinkKafkaConsumer问题 > > > > Hi op, > > 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic > 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit > 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka > 服务端的一个角色。 > > 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id > commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset > 开始消费。 > > gt; 在 2020年9月3日,下午3:03,op <520075...@qq.comgt; 写道: > gt; > gt; amp;nbsp; amp;nbsp; hi,amp;nbsp; > amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,amp;nbsp; amp;nbsp; > 这有两个相同代码的程序: > gt; //--- > gt; val bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment > gt; Env.setRestartStrategy(RestartStrategies.noRestart()) > gt; val consumerProps = new Properties() > gt; consumerProps.put("bootstrap.servers", brokers) > gt; consumerProps.put("group.id", "test1234") > gt; > gt; val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > gt; Env.addSource(consumer).print() > gt; > Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic, > group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer > group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 > 谢谢
Re: Flink-1.11 Table API &符号 语法问题
多谢,引入以下包解决了我的问题 import org.apache.flink.table.api._ -- Sent from: http://apache-flink.147419.n8.nabble.com/