Re:Re:Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点
重新格式下, 不好意思 hi: 今天又试了下, 我这边出现问题是因为: join时使用的语法问题 照成的 应该使用这种语法 -- temporal join the JDBC table as a dimension table SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = MyUserTable.id; 而不是 SELECT * FROM myTopic a LEFTJOIN MyUserTable b ON a.id = b.id -- hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的 应该使用这种语法 -- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; 而不是 SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id 文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache 希望对你有帮助 在 2021-04-19 18:54:38,"anonnius" 写道: >hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的 > >应该使用这种语法 >-- temporal join the JDBC table as a dimension >tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; >而不是 >SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id >文档连接在这里, >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache > >希望对你有帮助 > > > > > > > > >在 2021-04-14 16:47:04,"anonnius" 写道: >>+1, 目前也遇到了 >>在 2021-01-21 17:52:06,"刘海" 写道: >>>HI! >>>这边做测试时遇到一个问题: >>>在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >>>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid >>>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT >>>ENFORCED\n" + >>>") WITH (" + >>>"'connector' = 'jdbc'," + >>>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >>>"'table-name' = 'tm_dealers'," + >>>"'driver' = 'com.mysql.cj.jdbc.Driver'," + >>>"'username' = 'root'," + >>>"'password' = 'Cdh2020:1'," + >>>"'lookup.cache.max-rows' = '500',"+ >>>"'lookup.cache.ttl' = '1800s',"+ >>>"'sink.buffer-flush.interval' = '60s'"+ >>>")"); >>> >>> >>>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >>>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED >>>instead. Aborting checkpoint. >>> >>> >>>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? >>> >>> >>>感谢大佬指导一下,拜谢! >>>| | >>>刘海 >>>| >>>| >>>liuha...@163.com >>>| >>>签名由网易邮箱大师定制
Re:Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点
hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的 应该使用这种语法 -- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; 而不是 SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id 文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache 希望对你有帮助 在 2021-04-14 16:47:04,"anonnius" 写道: >+1, 目前也遇到了 >在 2021-01-21 17:52:06,"刘海" 写道: >>HI! >>这边做测试时遇到一个问题: >>在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid >>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT >>ENFORCED\n" + >>") WITH (" + >>"'connector' = 'jdbc'," + >>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >>"'table-name' = 'tm_dealers'," + >>"'driver' = 'com.mysql.cj.jdbc.Driver'," + >>"'username' = 'root'," + >>"'password' = 'Cdh2020:1'," + >>"'lookup.cache.max-rows' = '500',"+ >>"'lookup.cache.ttl' = '1800s',"+ >>"'sink.buffer-flush.interval' = '60s'"+ >>")"); >> >> >>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED >>instead. Aborting checkpoint. >> >> >>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? >> >> >>感谢大佬指导一下,拜谢! >>| | >>刘海 >>| >>| >>liuha...@163.com >>| >>签名由网易邮箱大师定制
Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点
+1, 目前也遇到了 在 2021-01-21 17:52:06,"刘海" 写道: >HI! >这边做测试时遇到一个问题: >在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid >DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" >+ >") WITH (" + >"'connector' = 'jdbc'," + >"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >"'table-name' = 'tm_dealers'," + >"'driver' = 'com.mysql.cj.jdbc.Driver'," + >"'username' = 'root'," + >"'password' = 'Cdh2020:1'," + >"'lookup.cache.max-rows' = '500',"+ >"'lookup.cache.ttl' = '1800s',"+ >"'sink.buffer-flush.interval' = '60s'"+ >")"); > > >我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED >instead. Aborting checkpoint. > > >进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? > > >感谢大佬指导一下,拜谢! >| | >刘海 >| >| >liuha...@163.com >| >签名由网易邮箱大师定制
Re:Re: flink waterMark 相关问题
在 StreamExecutionEnvironmennt的方法@PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } 在 2021-01-13 10:09:54,"张锴" 写道: >我从ExecutionConfig找到了,private long autoWatermarkInterval = >0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗 > >Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道: > >> hi张锴, >> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig >> 对应的属性autoWatermarkInterval >> >> >> >> >> --原始邮件-- >> 发件人: "anonnius"> 发送时间: 2021年1月13日(星期三) 上午9:19 >> 收件人: "user-zh"> 主题: Re:flink waterMark 相关问题 >> >> >> >> 可以看一下 ExecutionConfig这个类 >> 在 2021-01-12 17:55:47,"张锴" > hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
Re:flink waterMark 相关问题
可以看一下 ExecutionConfig这个类 在 2021-01-12 17:55:47,"张锴" 写道: >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
Re:回复:flink sql count问题
select count(nullif(if(name not like '南京%', '其他', '南京'), '其他')) 在 2020-09-27 17:23:07,"zya" 写道: >你好,链接无法显示,能麻烦再贴下吗 > > >--原始邮件-- >发件人: > "user-zh" > >发送时间:2020年9月27日(星期天) 下午5:20 >收件人:"user-zh" >主题:Re:回复:flink sql count问题 > > > >hi:'南京'), '其他')) >在 2020-09-27 17:07:39,"zya" 貌似只能这样了,感谢回答 > > > > >--nbsp;原始邮件nbsp;-- >发件人: > >"user-zh" > 发送时间:nbsp;2020年9月27日(星期天) 下午5:03 >收件人:nbsp;"user-zh" >主题:nbsp;Re:回复:flink sql count问题 > > > >你count 也会生成记录啊。 你过滤掉就行nbsp;nbsp; 。 比如 having xxxnbsp; >,或者加个filter >在 2020-09-27 17:01:06,"zya" gt;这个是我现在的做法,但是的问题就是使用sum会在条件没满足时也会在mysql中生成一条记录 >gt;amp;nbsp; >gt; >gt; >gt; >gt; >gt;--amp;nbsp;原始邮件amp;nbsp;-- >gt;发件人:nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > >"user-zh"nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > gt;发送时间:amp;nbsp;2020年9月27日(星期天) 下午4:59 >gt;收件人:amp;nbsp;"user-zh"gt; >gt;主题:amp;nbsp;Re:flink sql count问题 >gt; >gt; >gt; >gt;最好把null 变成0,amp;nbsp; 你这样amp;nbsp;amp;nbsp; >sum(if(name like '南京%',1 , 0)) >gt;在 2020-09-27 16:53:56,"zya" gt;amp;gt;请教各位: >gt;amp;gt;我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录, >gt;amp;gt;之前在hive中是这么写的:count(if(name like '南京%',1 , >null)),但是flink sql中count不能为null,有什么别的方法能实现该功能吗? >gt;amp;gt;使用的是flink1.10.1 blink >gt;amp;gt;amp;amp;nbsp;
Re:??????flink sql count????
hi:select count(nullif(if(name not like '%', '', ''), '')) ?? 2020-09-27 17:07:39??"zya" ?? > > > > > >---- >??: >"user-zh" > >:2020??9??27??(??) 5:03 >??:"user-zh" >:Re:??flink sql count > > > >??count ?? having xxx >,filter >?? 2020-09-27 17:01:06??"zya" ??sum??mysql?? >nbsp; > > > > >--nbsp;nbsp;-- >??: > >"user-zh" > :nbsp;2020??9??27??(??) 4:59 >??:nbsp;"user-zh" >:nbsp;Re:flink sql count > > > >??null 0??nbsp; ??nbsp;nbsp; sum(if(name like >'%',1 , 0)) >?? 2020-09-27 16:53:56??"zya" gt;?? >gt;sqlcountcountcount >gt;??hive??count(if(name like '%',1 , >null))??flink sql??count??null >gt;flink1.10.1 blink >gt;amp;nbsp;
Re:回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: 感觉你的关注和回复 1> 下面是我的分析过程 1. 第一次是, 先在sql-client.sh 中执行sql select tumble_start(rowtime, interval '2' MINUTE) as wStart, tumble_end(rowtime, interval '2' MINUTE) as wEnd, count(1) as pv, count(distinct uuid) as uv from iservVisit group by tumble(rowtime, interval '2' MINUTE) 此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入, 并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的 2. 第二次是, 退出sql-client.sh后在执行sql select tumble_start(rowtime, interval '2' MINUTE) as wStart, tumble_end(rowtime, interval '2' MINUTE) as wEnd, count(1) as pv, count(distinct uuid) as uv from iservVisit group by tumble(rowtime, interval '2' MINUTE) 这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和 手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序 不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了 3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效) create table iservVisit ( type string comment '时间类型', uuid string comment '用户uri', clientTime string comment '10位时间戳', rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型 WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 作为watermark, 有1分钟变为5分钟 ) with ( 'connector' = 'kafka-0.10', 'topic' = 'message-json', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'consumer-rt', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset' ) 4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致 5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点 在 2020-09-18 14:42:42,"chengyanan1...@foxmail.com" 写道: >先占个楼 >我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果 >select >tumble_start(rowtime, interval '2' MINUTE) as wStart, >tumble_end(rowtime, interval '2' MINUTE) as wEnd, >count(1) as pv, >count(distinct uuid) as uv >from iservVisit >group by tumble(rowtime, interval '2' MINUTE) >最后得到的结果是这样的 :(跟题主不一样) > > wStart wEndpv > uv > 2020-09-18T09:14 2020-09-18T09:16 2 > 2 > 2020-09-18T09:16 2020-09-18T09:18 8 > 3 > 2020-09-18T09:18 2020-09-18T09:20 8 > 3 > 2020-09-18T09:20 2020-09-18T09:22 2 > 2 > >等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的): >wStartwEnd >pvuv >2020-09-18T09:14 2020-09-18T09:16 2 > 2 >2020-09-18T09:16 2020-09-18T09:18 2 > 2 >2020-09-18T09:18 2020-09-18T09:20 8 > 3 >2020-09-18T09:20 2020-09-18T09:22 2 > 2 > > > > >发件人: anonnius >发送时间: 2020-09-18 11:24 >收件人: user-zh >主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同 >hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么 >0> mac本地环境 >1> flink 1.11.1 >2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1 >3> 使用的是sql-client.sh 环境 >4> 先在sql-cli中创建了iservVisit表 >create table iservVisit ( >type string comment '时间类型', >uuid string comment '用户uri', >clientTime string comment '10位时间戳', >rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, > '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型 >WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark >) with ( >'connector' = 'kafka-0.10', >'topic' = 'message-json', >'properties.bootstrap.servers' = 'localhost:9092', >'properties.group.id' = 'consumer-rt', >'format' = 'json', >'json.ignore-parse-errors' = 'true', >'scan.startup.mode' = 'earliest-offset' >) >然后在sql-cli执行sql >select >tumble_start(rowtime, interval '2' MINUTE) as wStart, >tumble_end(rowtime, interval '2' MINUTE) as wEnd, >count(1) as pv, >count(distinct uuid) as uv >from iservVisit >group by tumble(rowtime, interval '2' MINUTE) >5> 向kafka生产者依次发送下面的json消息 >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} >{"type":
Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么 0> mac本地环境 1> flink 1.11.1 2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1 3> 使用的是sql-client.sh 环境 4> 先在sql-cli中创建了iservVisit表 create table iservVisit ( type string comment '时间类型', uuid string comment '用户uri', clientTime string comment '10位时间戳', rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型 WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark ) with ( 'connector' = 'kafka-0.10', 'topic' = 'message-json', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'consumer-rt', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset' ) 然后在sql-cli执行sql select tumble_start(rowtime, interval '2' MINUTE) as wStart, tumble_end(rowtime, interval '2' MINUTE) as wEnd, count(1) as pv, count(distinct uuid) as uv from iservVisit group by tumble(rowtime, interval '2' MINUTE) 5> 向kafka生产者依次发送下面的json消息 {"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} {"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 6> 第一次结果(这里sql-cli的sql一直在运行) wStart wEndpv uv 2020-09-18T09:14 2020-09-18T09:16 5 3 2020-09-18T09:16 2020-09-18T09:18 8 3 2020-09-18T09:18 2020-09-18T09:20 8 3 2020-09-18T09:20 2020-09-18T09:22 2 2 7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行) wStartwEnd pv uv 2020-09-18T09:14 2020-09-18T09:16 2 2 2020-09-18T09:16 2020-09-18T09:18 2 2 2020-09-18T09:18 2020-09-18T09:20 8 3 2020-09-18T09:20 2020-09-18T09:22 2 2 8> 详细过程以放入附件文件中 1. sql-client.shä¸ å»ºè¡¨ create table iservVisit ( type string comment 'æ¶é´ç±»å', uuid string comment 'ç¨æ·uri', clientTime string comment '10ä½æ¶é´æ³', rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计ç®å, 10ä½æ¶é´æ³è½¬ä¸ºtimestampç±»å WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计ç®å, ä½ä¸ºwatermark ) with ( 'connector' = 'kafka-0.10', 'topic' = 'message-json', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'consumer-rt', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset' ) 2. sql-client.shä¸ è¿è¡ select tumble_start(rowtime, interval '2' MINUTE) as wStart, tumble_end(rowtime, interval '2' MINUTE) as wEnd, count(1) as pv, count(distinct uuid) as uv from iservVisit group by tumble(rowtime, interval '2' MINUTE) 3. kafka ç产è ä¾æ¬¡åå ¥æ¶æ¯ kafkaè®°å½ clientTimeæ¶æ¯æ¶é´ 产ççwatermarkæ¶é´ 说æ {"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}2020-09-18 09:14:44 2020-09-18 09:13:44