回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 郑斌斌
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
 |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
[2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
[2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

 at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 13:12
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
郑斌斌  于2020年9月23日周三 下午12:29写道:
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
 单流跑的话,比较正常。
 JOB的内存是4G。版本1.11.1
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 10:50
 收件人:user-zh 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 Hi Tianwang,

 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
 `Math.max(leftRelativeSize, rightRelativeSize) +
 allowedLateness`,根据你的SQL,这个值应该是6h
 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
 2;`,在你的SQL来讲,就是3h,也就是说
 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

 希望这个可以解答你的疑惑~

 [1] https://issues.apache.org/jira/browse/FLINK-18996

 Tianwang Li  于2020年9月22日周二 下午8:26写道:

 > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
 >
 >
 > 【join】
 >
 > > SELECT `b`.`rowtime`,
 > > `a`.`c_id`,
 > > `b`.`openid`
 > > FROM `test_table_a` AS `a`
 > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
 > > AND `a`.`openid` = `b`.`openid`
 > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
 > > AND `a`.`rowtime` + INTERVAL '6' HOUR
 > >
 > >
 > 【window】
 

Re: [SQL] parse table name from sql statement

2020-09-22 文章 Harold.Miao
thx

silence  于2020年9月22日周二 上午11:54写道:

> 写过一个类似的可以参考一下
>
> private static List lookupSelectTable(SqlNode sqlNode) {
> List list = new ArrayList<>();
> if (sqlNode instanceof SqlSelect) {
> SqlNode from = ((SqlSelect) sqlNode).getFrom();
> list.addAll(lookupSelectTable(from));
> } else if (sqlNode instanceof SqlJoin) {
> SqlJoin sqlJoin = (SqlJoin) sqlNode;
> list.addAll(lookupSelectTable(sqlJoin.getLeft()));
> list.addAll(lookupSelectTable(sqlJoin.getRight()));
> } else if (sqlNode instanceof SqlBasicCall) {
> SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
> SqlOperator operator = sqlBasicCall.getOperator();
> if (SqlKind.AS.equals(operator.getKind())) {
>
> list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
> } else if (SqlKind.UNION.equals(operator.getKind())) {
> for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
> list.addAll(lookupSelectTable(operandSqlNode));
> }
> } else {
> throw new RuntimeException("operator " + operator.getKind()
> + " not support");
> }
> } else if (sqlNode instanceof SqlIdentifier) {
> list.add(((SqlIdentifier) sqlNode).getSimple());
> } else {
> throw new RuntimeException("operator " + sqlNode.getClass() + "
> not support");
> }
> return list;
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best Regards,
Harold Miao


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。

郑斌斌  于2020年9月23日周三 下午12:29写道:

>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
> 单流跑的话,比较正常。
> JOB的内存是4G。版本1.11.1
> --
> 发件人:Benchao Li 
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh 
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

-- 

Best,
Benchao Li


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Tianwang Li
Hi Benchao, 感谢你的回复

我使用的RocksDB,内存 overhead 太多了, 什么地方超用了那么多内存,好像不受控制了。
另外,我也测试过hop窗口,也是超用内存比较。没有使用增量checkpoint。


最后,我这边的interval join 是 inner join ,使用的 b 表的rowtime作为时间,没有观察到延迟数据的情况。

[image: image.png]




Benchao Li  于2020年9月23日周三 上午10:50写道:

> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
**
 tivanli
**


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 Danny Chan
应该是碰到节点 cycle 引用了,导致优化 rule 一直重复重复触发,可以将 debug 日志打开,看下是哪个 rule 
被频繁触发了,之前修过一个类似的问题[1],可以参考下

[1] https://issues.apache.org/jira/browse/CALCITE-3121

Best,
Danny Chan
在 2020年9月23日 +0800 AM10:23,jun su ,写道:
> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > > > blink planner 有这个问题吗?
> > > > >
> > > > > jun su  于2020年9月22日周二 下午3:27写道:
> > > > >
> > > > > > hi all,
> > > > > >
> > > > > > 环境: flink-1.9.2 flink table planner
> > > > > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > > > >
> > > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > > > > ---
> > > > > > 代码:
> > > > > >
> > > > > > fbTableEnv.registerTableSource("source",orcTableSource)
> > > > > >
> > > > > > val select = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > > > >
> > > > > > fbTableEnv.registerTable("selectTable",select)
> > > > > >
> > > > > > val t1 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > > > > where Auth_Roles like 'a%'")
> > > > > > fbTableEnv.registerTable("t1",t1)
> > > > > >
> > > > > > val t2 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > > > > > Target_UserSid= 'b'")
> > > > > > fbTableEnv.registerTable("t2",t2)
> > > > > >
> > > > > > val t3 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > > > > > Thread_ID= 'c'")
> > > > > > fbTableEnv.registerTable("t3",t3)
> > > > > >
> > > > > > val t4 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > > > > > access_path= 'd'")
> > > > > > fbTableEnv.registerTable("t4",t4)
> > > > > >
> > > > > > val t5 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > > > > > action= 'e'")
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Jun Su
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su


回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 郑斌斌
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
单流跑的话,比较正常。
JOB的内存是4G。版本1.11.1
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 10:50
收件人:user-zh 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li



Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 godfrey he
例如 calc merge rule,还有calc,agg等其他相关rule,点比较散。得具体看

jun su  于2020年9月23日周三 上午10:22写道:

> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > >> blink planner 有这个问题吗?
> > > >>
> > > >> jun su  于2020年9月22日周二 下午3:27写道:
> > > >>
> > > >> > hi all,
> > > >> >
> > > >> > 环境: flink-1.9.2 flink table planner
> > > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > >> >
> > > >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > >> > ---
> > > >> > 代码:
> > > >> >
> > > >> > fbTableEnv.registerTableSource("source",orcTableSource)
> > > >> >
> > > >> > val select = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > >> >
> > > >> > fbTableEnv.registerTable("selectTable",select)
> > > >> >
> > > >> > val t1 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > >> > where Auth_Roles like 'a%'")
> > > >> > fbTableEnv.registerTable("t1",t1)
> > > >> >
> > > >> > val t2 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1
> where
> > > >> > Target_UserSid= 'b'")
> > > >> > fbTableEnv.registerTable("t2",t2)
> > > >> >
> > > >> > val t3 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2
> where
> > > >> > Thread_ID= 'c'")
> > > >> > fbTableEnv.registerTable("t3",t3)
> > > >> >
> > > >> > val t4 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3
> where
> > > >> > access_path= 'd'")
> > > >> > fbTableEnv.registerTable("t4",t4)
> > > >> >
> > > >> > val t5 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4
> where
> > > >> > action= 'e'")
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Best,
> > > >> > Jun Su
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 jun su
hi godfrey,
方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的

godfrey he  于2020年9月23日周三 上午10:09写道:

> Hi Jun,
>
> 可能是old planner缺少一些rule导致遇到了corner case,
> blink planner之前解过一些类似的案例。
>
> jun su  于2020年9月23日周三 上午9:53写道:
>
> > hi godfrey,
> >
> > 刚看了下, blink应该也会用hep , 上文说错了
> >
> > jun su  于2020年9月23日周三 上午9:19写道:
> >
> > > hi godfrey,
> > > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > >
> > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > >
> > >> blink planner 有这个问题吗?
> > >>
> > >> jun su  于2020年9月22日周二 下午3:27写道:
> > >>
> > >> > hi all,
> > >> >
> > >> > 环境: flink-1.9.2 flink table planner
> > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > >> >
> > >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > 导致进程OOM
> > >> > ---
> > >> > 代码:
> > >> >
> > >> > fbTableEnv.registerTableSource("source",orcTableSource)
> > >> >
> > >> > val select = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> ")
> > >> >
> > >> > fbTableEnv.registerTable("selectTable",select)
> > >> >
> > >> > val t1 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > selectTable
> > >> > where Auth_Roles like 'a%'")
> > >> > fbTableEnv.registerTable("t1",t1)
> > >> >
> > >> > val t2 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > >> > Target_UserSid= 'b'")
> > >> > fbTableEnv.registerTable("t2",t2)
> > >> >
> > >> > val t3 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > >> > Thread_ID= 'c'")
> > >> > fbTableEnv.registerTable("t3",t3)
> > >> >
> > >> > val t4 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > >> > access_path= 'd'")
> > >> > fbTableEnv.registerTable("t4",t4)
> > >> >
> > >> > val t5 = fbTableEnv.sqlQuery("select
> > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > >> > action= 'e'")
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > Best,
> > >> > Jun Su
> > >> >
> > >>
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


flink从mongdb中读取数据

2020-09-22 文章 GeGe
您好!


我刚开始接触flink,现在准备用flink读取mongdb中的数据。mongodb中每秒都会有一个新增数据,重写RichSourceFunction来连接读取mongodb中数据。但是flink不能像kafka一样,持续不断显示数据,是否mongodb不能持续显示数据呢?还是有一些额外设置?


十分感谢!


Best wishes,


Gege

Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 godfrey he
Hi Jun,

可能是old planner缺少一些rule导致遇到了corner case,
blink planner之前解过一些类似的案例。

jun su  于2020年9月23日周三 上午9:53写道:

> hi godfrey,
>
> 刚看了下, blink应该也会用hep , 上文说错了
>
> jun su  于2020年9月23日周三 上午9:19写道:
>
> > hi godfrey,
> > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> >
> > godfrey he  于2020年9月22日周二 下午8:58写道:
> >
> >> blink planner 有这个问题吗?
> >>
> >> jun su  于2020年9月22日周二 下午3:27写道:
> >>
> >> > hi all,
> >> >
> >> > 环境: flink-1.9.2 flink table planner
> >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> >> >
> >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> 导致进程OOM
> >> > ---
> >> > 代码:
> >> >
> >> > fbTableEnv.registerTableSource("source",orcTableSource)
> >> >
> >> > val select = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
> >> >
> >> > fbTableEnv.registerTable("selectTable",select)
> >> >
> >> > val t1 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> selectTable
> >> > where Auth_Roles like 'a%'")
> >> > fbTableEnv.registerTable("t1",t1)
> >> >
> >> > val t2 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> >> > Target_UserSid= 'b'")
> >> > fbTableEnv.registerTable("t2",t2)
> >> >
> >> > val t3 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> >> > Thread_ID= 'c'")
> >> > fbTableEnv.registerTable("t3",t3)
> >> >
> >> > val t4 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> >> > access_path= 'd'")
> >> > fbTableEnv.registerTable("t4",t4)
> >> >
> >> > val t5 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> >> > action= 'e'")
> >> >
> >> >
> >> >
> >> > --
> >> > Best,
> >> > Jun Su
> >> >
> >>
> >
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 jun su
hi godfrey,

刚看了下, blink应该也会用hep , 上文说错了

jun su  于2020年9月23日周三 上午9:19写道:

> hi godfrey,
> 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
>
> godfrey he  于2020年9月22日周二 下午8:58写道:
>
>> blink planner 有这个问题吗?
>>
>> jun su  于2020年9月22日周二 下午3:27写道:
>>
>> > hi all,
>> >
>> > 环境: flink-1.9.2 flink table planner
>> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>> >
>> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
>> > ---
>> > 代码:
>> >
>> > fbTableEnv.registerTableSource("source",orcTableSource)
>> >
>> > val select = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
>> >
>> > fbTableEnv.registerTable("selectTable",select)
>> >
>> > val t1 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
>> > where Auth_Roles like 'a%'")
>> > fbTableEnv.registerTable("t1",t1)
>> >
>> > val t2 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
>> > Target_UserSid= 'b'")
>> > fbTableEnv.registerTable("t2",t2)
>> >
>> > val t3 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
>> > Thread_ID= 'c'")
>> > fbTableEnv.registerTable("t3",t3)
>> >
>> > val t4 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
>> > access_path= 'd'")
>> > fbTableEnv.registerTable("t4",t4)
>> >
>> > val t5 = fbTableEnv.sqlQuery("select
>> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
>> > action= 'e'")
>> >
>> >
>> >
>> > --
>> > Best,
>> > Jun Su
>> >
>>
>
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 jun su
hi godfrey,
我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因

godfrey he  于2020年9月22日周二 下午8:58写道:

> blink planner 有这个问题吗?
>
> jun su  于2020年9月22日周二 下午3:27写道:
>
> > hi all,
> >
> > 环境: flink-1.9.2 flink table planner
> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> >
> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> > ---
> > 代码:
> >
> > fbTableEnv.registerTableSource("source",orcTableSource)
> >
> > val select = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
> >
> > fbTableEnv.registerTable("selectTable",select)
> >
> > val t1 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
> > where Auth_Roles like 'a%'")
> > fbTableEnv.registerTable("t1",t1)
> >
> > val t2 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > Target_UserSid= 'b'")
> > fbTableEnv.registerTable("t2",t2)
> >
> > val t3 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > Thread_ID= 'c'")
> > fbTableEnv.registerTable("t3",t3)
> >
> > val t4 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > access_path= 'd'")
> > fbTableEnv.registerTable("t4",t4)
> >
> > val t5 = fbTableEnv.sqlQuery("select
> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > action= 'e'")
> >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 godfrey he
blink planner 有这个问题吗?

jun su  于2020年9月22日周二 下午3:27写道:

> hi all,
>
> 环境: flink-1.9.2 flink table planner
> 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>
>   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> ---
> 代码:
>
> fbTableEnv.registerTableSource("source",orcTableSource)
>
> val select = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
>
> fbTableEnv.registerTable("selectTable",select)
>
> val t1 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
> where Auth_Roles like 'a%'")
> fbTableEnv.registerTable("t1",t1)
>
> val t2 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> Target_UserSid= 'b'")
> fbTableEnv.registerTable("t2",t2)
>
> val t3 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> Thread_ID= 'c'")
> fbTableEnv.registerTable("t3",t3)
>
> val t4 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> access_path= 'd'")
> fbTableEnv.registerTable("t4",t4)
>
> val t5 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> action= 'e'")
>
>
>
> --
> Best,
> Jun Su
>


keyedstate TTL 清理状态如何触发

2020-09-22 文章 smq
大家好,现在有个疑问,TTL如果设成1min,那么是时间到了之后,该state自动清除吗

[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Tianwang Li
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。


【join】

> SELECT `b`.`rowtime`,
> `a`.`c_id`,
> `b`.`openid`
> FROM `test_table_a` AS `a`
> INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> AND `a`.`openid` = `b`.`openid`
> AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> AND `a`.`rowtime` + INTERVAL '6' HOUR
>
>
【window】

> SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `rowtime`,
> HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__windoow_start__`,
> HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__window_end__`,
> `c_id`,
> COUNT(`openid`) AS `cnt`
> FROM `test_table_in_6h`
> GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> `c_id`
>


我配置了Fink的内存是4G, 实际使用达到了6.8 G。
同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右

【配置】

> cat conf/flink-conf.yaml
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 6144m
> taskmanager.memory.process.size: 4g
> taskmanager.memory.jvm-overhead.min: 1024m
> taskmanager.memory.jvm-overhead.max: 2048m
> taskmanager.debug.memory.log-interval: 1
> env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
> -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>



-- 
**
 tivanli
**


Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题

2020-09-22 文章 Jark Wu
1. 你用的是什么版本呢?我在最新的版本上试了下,没有报错呢。试下最新版本?
2. {123123,ff=1, 123123,kk=2} 就是 MULTISET 打印出来的结构哈, 就是 {key1=cnt, key2=cnt}
其中 123123,ff 是 row#toString。

Best,
Jark

On Tue, 22 Sep 2020 at 19:28, Li,Qian(DXM,PB)  wrote:

> 你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么?
>
> create view uu AS select collect(userBankTime) as userBank ,name from (
> select  name, row(name,description) as userBankTime from tt_haha) group by
> name; // 创建视图,描述userBank类型是MULTITYPE
>
> Flink SQL> desc uu;
> root
>  |-- userBank: MULTISET NOT NULL>
> NOT NULL
>  |-- name: STRING
>
> CREATE TABLE user_log_sink_10 (
> > name STRING,
> > maps MULTISET>,
> > PRIMARY KEY (name) NOT ENFORCED
> > ) WITH (
> > 'connector' = 'elasticsearch-6',
> > 'hosts' = '',
> > 'index' = 'enriched_orders',
> > 'document-type' = 'user'
> > );
>
> insert into user_log_sink_10 select name, collect(userBankTime) as maps
> from (select name, row(name,description) as userBankTime from tt_haha)
> group by name;
>
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassCastException:
> org.apache.flink.table.types.logical.MultisetType cannot be cast to
> org.apache.flink.table.types.logical.MapType
>
> 还有select name, collect(userBankTime) as maps from (select name,
> row(name,description) as userBankTime from tt_haha) group by name 出来的结果是
> test123{test123,123123=1}
> 123123{123123,ff=1, 123123,kk=2}
> 这种的,在flink sql中是什么格式呢?
>
> ||
>
> 在 2020/9/21 下午3:05,“Jark Wu” 写入:
>
> 1. concat 是内置函数,可以直接用。
> 2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
> 3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
> 4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
> 5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> [2]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html
>
> On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) 
> wrote:
>
> > Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
> >
> > 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
> >
> > 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
> >
> >
> > 在 2020/9/21 下午1:42,“Jark Wu” 写入:
> >
> > 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
> >
> > select userId, collect(userBankTime)
> > from (
> >   select  userId, concat(userBankNo, '_', createTime) as
> userBankTime
> >   from aa as a left join bb as b where a.userId=b.userId
> > ) group by userId;
> >
> >
> > Best,
> > Jark
> >
> > On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <
> liq...@duxiaoman.com>
> > wrote:
> >
> > > 请问:
> > >
> > > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> > > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
> > >   表aa
> > >   id, userId
> > >   表 bb
> > >   userId,userBankNo,createTime
> > >
> > > select  * from aa as a left join bb as b where
> a.userId=b.userId
> > >
> > > 谢谢!
> > >
> > >
> >
> >
> >
>
>
>


Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题

2020-09-22 文章 Li,Qian(DXM,PB)
你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么?

create view uu AS select collect(userBankTime) as userBank ,name from ( select  
name, row(name,description) as userBankTime from tt_haha) group by name; // 
创建视图,描述userBank类型是MULTITYPE

Flink SQL> desc uu;
root
 |-- userBank: MULTISET NOT NULL> NOT NULL
 |-- name: STRING

CREATE TABLE user_log_sink_10 (
> name STRING,
> maps MULTISET>,
> PRIMARY KEY (name) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-6',
> 'hosts' = '',
> 'index' = 'enriched_orders',
> 'document-type' = 'user'
> );

insert into user_log_sink_10 select name, collect(userBankTime) as maps from 
(select name, row(name,description) as userBankTime from tt_haha) group by name;

[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: org.apache.flink.table.types.logical.MultisetType 
cannot be cast to org.apache.flink.table.types.logical.MapType

还有select name, collect(userBankTime) as maps from (select name, 
row(name,description) as userBankTime from tt_haha) group by name 出来的结果是
test123{test123,123123=1}
123123{123123,ff=1, 123123,kk=2}
这种的,在flink sql中是什么格式呢?

||

在 2020/9/21 下午3:05,“Jark Wu” 写入:

1. concat 是内置函数,可以直接用。
2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].

Best,
Jark

[1]:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
[2]:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html

On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB)  wrote:

> Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
>
> 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
>
> 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
>
>
> 在 2020/9/21 下午1:42,“Jark Wu” 写入:
>
> 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
>
> select userId, collect(userBankTime)
> from (
>   select  userId, concat(userBankNo, '_', createTime) as userBankTime
>   from aa as a left join bb as b where a.userId=b.userId
> ) group by userId;
>
>
> Best,
> Jark
>
> On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) 
> wrote:
>
> > 请问:
> >
> > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
> >   表aa
> >   id, userId
> >   表 bb
> >   userId,userBankNo,createTime
> >
> > select  * from aa as a left join bb as b where a.userId=b.userId
> >
> > 谢谢!
> >
> >
>
>
>




flink on yarn容器异常退出

2020-09-22 文章 Dream-底限
hi
我正在使用flink1.11.1 on
yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗


答复: Flink-1.11.1 Kafka Table API BigInt 问题

2020-09-22 文章 刘首维
Hi,

试一下java的BigInteger呢


发件人: nashcen <2415370...@qq.com>
发送时间: 2020年9月22日 16:29:41
收件人: user-zh@flink.apache.org
主题: Flink-1.11.1 Kafka Table API BigInt 问题

*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")

// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

// 2. 注册 KafkaSource 表
tableEnv.connect(new Kafka()
  .version("universal")
  .topic("ods_dcpoints_dev")
  .property("zookeeper.connect", "localhost:2181")
  .property("bootstrap.servers", "localhost:9092")
  //.property("group.id", "testGroup")
  .startFromEarliest()
)
  .withFormat(new Json())
  .withSchema(new Schema()
.field("assetSpecId", DataTypes.STRING())
.field("dcnum", DataTypes.STRING())
.field("monitorType", DataTypes.STRING())
.field("tagNo", DataTypes.STRING())
.field("updateTime", DataTypes.BIGINT())
.field("value", DataTypes.STRING())

  )
  .createTemporaryTable("kafka_source_table")

// 3. 查询转换
// 使用 sqlQuery
val selectKafkaTable: Table = tableEnv.sqlQuery(s"select
assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")

selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink-1.11.1 Kafka Table API BigInt 问题

2020-09-22 文章 nashcen
*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")

// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

// 2. 注册 KafkaSource 表
tableEnv.connect(new Kafka()
  .version("universal")
  .topic("ods_dcpoints_dev")
  .property("zookeeper.connect", "localhost:2181")
  .property("bootstrap.servers", "localhost:9092")
  //.property("group.id", "testGroup")
  .startFromEarliest()
)
  .withFormat(new Json())
  .withSchema(new Schema()
.field("assetSpecId", DataTypes.STRING())
.field("dcnum", DataTypes.STRING())
.field("monitorType", DataTypes.STRING())
.field("tagNo", DataTypes.STRING())
.field("updateTime", DataTypes.BIGINT())
.field("value", DataTypes.STRING())

  )
  .createTemporaryTable("kafka_source_table")

// 3. 查询转换
// 使用 sqlQuery
val selectKafkaTable: Table = tableEnv.sqlQuery(s"select 
assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")
   
selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-22 文章 chuyuan
好勒,这种方案已经成功了,非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 jun su
hi all,

环境: flink-1.9.2 flink table planner
现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM

  发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
---
代码:

fbTableEnv.registerTableSource("source",orcTableSource)

val select = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")

fbTableEnv.registerTable("selectTable",select)

val t1 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
where Auth_Roles like 'a%'")
fbTableEnv.registerTable("t1",t1)

val t2 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
Target_UserSid= 'b'")
fbTableEnv.registerTable("t2",t2)

val t3 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
Thread_ID= 'c'")
fbTableEnv.registerTable("t3",t3)

val t4 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
access_path= 'd'")
fbTableEnv.registerTable("t4",t4)

val t5 = fbTableEnv.sqlQuery("select
Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
action= 'e'")



-- 
Best,
Jun Su


Re: FlinkKafkaConsumer问题

2020-09-22 文章 赵一旦
这个问题和flink关系不大。Kafka本身就是这么个特点,指定group,如果是订阅方式,会是你想象的那样,分享消息。但,如果是通过assign方式指定了消费哪个分区,则不受到group中消费者共享消息的限制。

SmileSmile  于2020年9月5日周六 下午4:51写道:

> hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会?
>
> Best
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月04日 14:11,op 写道:
> 大概懂了 感谢
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> taochangl...@163.com;
> 发送时间:2020年9月4日(星期五) 中午11:54
> 收件人:"user-zh" acqua@gmail.com;
>
> 主题:Re: FlinkKafkaConsumer问题
>
>
>
>
> 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。
>
> 在 2020/9/4 10:34, Shuiqiang Chen 写道:
>
>  Hi,
>  为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的
> partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint
> 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么
> FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka
> 消费组管理者记录,Flink 无法维护这些信息。
> 
>  在 2020年9月4日,上午10:25,op <520075...@qq.com 写道:
> 
> 
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?nbsp;
> 
> 
>  --nbsp;原始邮件nbsp;--
> 
> 发件人:
> "user-zh"
>   发送时间:nbsp;2020年9月3日(星期四) 晚上6:09
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: FlinkKafkaConsumer问题
> 
> 
> 
>  Hi op,
> 
>  在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic
> 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit
> 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka
> 服务端的一个角色。
> 
>  另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id
> commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset
> 开始消费。
> 
>  gt; 在 2020年9月3日,下午3:03,op <520075...@qq.comgt; 写道:
>  gt;
>  gt; amp;nbsp; amp;nbsp; hi,amp;nbsp;
> amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,amp;nbsp; amp;nbsp;
> 这有两个相同代码的程序:
>  gt; //---
>  gt; val bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
>  gt; Env.setRestartStrategy(RestartStrategies.noRestart())
>  gt; val consumerProps = new Properties()
>  gt; consumerProps.put("bootstrap.servers", brokers)
>  gt; consumerProps.put("group.id", "test1234")
>  gt;
>  gt; val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  gt; Env.addSource(consumer).print()
>  gt;
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢


Re: Flink-1.11 Table API &符号 语法问题

2020-09-22 文章 nashcen
多谢,引入以下包解决了我的问题

import org.apache.flink.table.api._



--
Sent from: http://apache-flink.147419.n8.nabble.com/