Re:Re:Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-04-19 Thread anonnius
重新格式下, 不好意思
hi: 今天又试了下, 我这边出现问题是因为: join时使用的语法问题 照成的
应该使用这种语法
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;


而不是
SELECT * FROM myTopic a 
LEFTJOIN MyUserTable b
ON a.id = b.id
--
hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的

应该使用这种语法
-- temporal join the JDBC table as a dimension 
tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id;
而不是
SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id
文档连接在这里, 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache

希望对你有帮助

















在 2021-04-19 18:54:38,"anonnius"  写道:
>hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的
>
>应该使用这种语法
>-- temporal join the JDBC table as a dimension 
>tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id;
>而不是
>SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id
>文档连接在这里, 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache
>
>希望对你有帮助
>
>
>
>
>
>
>
>
>在 2021-04-14 16:47:04,"anonnius"  写道:
>>+1, 目前也遇到了
>>在 2021-01-21 17:52:06,"刘海"  写道:
>>>HI!
>>>这边做测试时遇到一个问题:
>>>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>>>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid 
>>>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT 
>>>ENFORCED\n" +
>>>") WITH (" +
>>>"'connector' = 'jdbc'," +
>>>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>>>"'table-name' = 'tm_dealers'," +
>>>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>>>"'username' = 'root'," +
>>>"'password' = 'Cdh2020:1'," +
>>>"'lookup.cache.max-rows' = '500',"+
>>>"'lookup.cache.ttl' = '1800s',"+
>>>"'sink.buffer-flush.interval' = '60s'"+
>>>")");
>>>
>>>
>>>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>>>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED 
>>>instead. Aborting checkpoint.
>>>
>>>
>>>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>>>
>>>
>>>感谢大佬指导一下,拜谢!
>>>| |
>>>刘海
>>>|
>>>|
>>>liuha...@163.com
>>>|
>>>签名由网易邮箱大师定制


Re:Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-04-19 Thread anonnius
hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的

应该使用这种语法
-- temporal join the JDBC table as a dimension 
tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id;
而不是
SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id
文档连接在这里, 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache

希望对你有帮助








在 2021-04-14 16:47:04,"anonnius"  写道:
>+1, 目前也遇到了
>在 2021-01-21 17:52:06,"刘海"  写道:
>>HI!
>>这边做测试时遇到一个问题:
>>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid 
>>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT 
>>ENFORCED\n" +
>>") WITH (" +
>>"'connector' = 'jdbc'," +
>>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>>"'table-name' = 'tm_dealers'," +
>>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>>"'username' = 'root'," +
>>"'password' = 'Cdh2020:1'," +
>>"'lookup.cache.max-rows' = '500',"+
>>"'lookup.cache.ttl' = '1800s',"+
>>"'sink.buffer-flush.interval' = '60s'"+
>>")");
>>
>>
>>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED 
>>instead. Aborting checkpoint.
>>
>>
>>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>>
>>
>>感谢大佬指导一下,拜谢!
>>| |
>>刘海
>>|
>>|
>>liuha...@163.com
>>|
>>签名由网易邮箱大师定制


Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-04-14 Thread anonnius
+1, 目前也遇到了
在 2021-01-21 17:52:06,"刘海"  写道:
>HI!
>这边做测试时遇到一个问题:
>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid 
>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" 
>+
>") WITH (" +
>"'connector' = 'jdbc'," +
>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>"'table-name' = 'tm_dealers'," +
>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>"'username' = 'root'," +
>"'password' = 'Cdh2020:1'," +
>"'lookup.cache.max-rows' = '500',"+
>"'lookup.cache.ttl' = '1800s',"+
>"'sink.buffer-flush.interval' = '60s'"+
>")");
>
>
>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED 
>instead. Aborting checkpoint.
>
>
>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>
>
>感谢大佬指导一下,拜谢!
>| |
>刘海
>|
>|
>liuha...@163.com
>|
>签名由网易邮箱大师定制


Re:Re: flink waterMark 相关问题

2021-01-12 Thread anonnius
在 StreamExecutionEnvironmennt的方法@PublicEvolving   public void 
setStreamTimeCharacteristic(TimeCharacteristic characteristic) {   
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);  
if (characteristic == TimeCharacteristic.ProcessingTime) { 
getConfig().setAutoWatermarkInterval(0);   } else 
{   getConfig().setAutoWatermarkInterval(200);
 }  }
在 2021-01-13 10:09:54,"张锴"  写道:
>我从ExecutionConfig找到了,private long autoWatermarkInterval =
>0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗
>
>Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:
>
>> hi张锴,
>> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
>> 对应的属性autoWatermarkInterval
>>
>>
>>
>>
>> --原始邮件--
>> 发件人: "anonnius"> 发送时间: 2021年1月13日(星期三) 上午9:19
>> 收件人: "user-zh"> 主题: Re:flink waterMark 相关问题
>>
>>
>>
>> 可以看一下 ExecutionConfig这个类
>> 在 2021-01-12 17:55:47,"张锴" > hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


Re:flink waterMark 相关问题

2021-01-12 Thread anonnius
可以看一下 ExecutionConfig这个类
在 2021-01-12 17:55:47,"张锴"  写道:
>hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


Re:回复:flink sql count问题

2020-09-27 Thread anonnius
select count(nullif(if(name not like '南京%', '其他', '南京'), '其他'))

















在 2020-09-27 17:23:07,"zya"  写道:
>你好,链接无法显示,能麻烦再贴下吗
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2020年9月27日(星期天) 下午5:20
>收件人:"user-zh"
>主题:Re:回复:flink sql count问题
>
>
>
>hi:'南京'), '其他'))
>在 2020-09-27 17:07:39,"zya" 貌似只能这样了,感谢回答
>
>
>
>
>--nbsp;原始邮件nbsp;--
>发件人:
> 
>"user-zh"
> 发送时间:nbsp;2020年9月27日(星期天) 下午5:03
>收件人:nbsp;"user-zh"
>主题:nbsp;Re:回复:flink sql count问题
>
>
>
>你count 也会生成记录啊。 你过滤掉就行nbsp;nbsp; 。 比如 having xxxnbsp; 
>,或者加个filter
>在 2020-09-27 17:01:06,"zya" gt;这个是我现在的做法,但是的问题就是使用sum会在条件没满足时也会在mysql中生成一条记录
>gt;amp;nbsp;
>gt;
>gt;
>gt;
>gt;
>gt;--amp;nbsp;原始邮件amp;nbsp;--
>gt;发件人:nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> 
>"user-zh"nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> gt;发送时间:amp;nbsp;2020年9月27日(星期天) 下午4:59
>gt;收件人:amp;nbsp;"user-zh"gt;
>gt;主题:amp;nbsp;Re:flink sql count问题
>gt;
>gt;
>gt;
>gt;最好把null 变成0,amp;nbsp; 你这样amp;nbsp;amp;nbsp; 
>sum(if(name like '南京%',1 , 0))
>gt;在 2020-09-27 16:53:56,"zya" gt;amp;gt;请教各位:
>gt;amp;gt;我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
>gt;amp;gt;之前在hive中是这么写的:count(if(name like '南京%',1 , 
>null)),但是flink sql中count不能为null,有什么别的方法能实现该功能吗?
>gt;amp;gt;使用的是flink1.10.1 blink
>gt;amp;gt;amp;amp;nbsp;


Re:??????flink sql count????

2020-09-27 Thread anonnius
hi:select count(nullif(if(name not like '%', 
'', ''), ''))
?? 2020-09-27 17:07:39??"zya"  ??
>
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2020??9??27??(??) 5:03
>??:"user-zh"
>:Re:??flink sql count
>
>
>
>??count   ??  having xxx 
>,filter
>?? 2020-09-27 17:01:06??"zya" ??sum??mysql??
>nbsp;
>
>
>
>
>--nbsp;nbsp;--
>??:
> 
>"user-zh"
> :nbsp;2020??9??27??(??) 4:59
>??:nbsp;"user-zh"
>:nbsp;Re:flink sql count
>
>
>
>??null 0??nbsp; ??nbsp;nbsp; sum(if(name like 
>'%',1 , 0))
>?? 2020-09-27 16:53:56??"zya" gt;??
>gt;sqlcountcountcount
>gt;??hive??count(if(name like '%',1 , 
>null))??flink sql??count??null
>gt;flink1.10.1 blink
>gt;amp;nbsp;


Re:回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-18 Thread anonnius
hi: 感觉你的关注和回复
1> 下面是我的分析过程
1. 第一次是, 先在sql-client.sh 中执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)


此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入, 
并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的


2. 第二次是, 退出sql-client.sh后在执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和 
手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了


3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 作为watermark, 
有1分钟变为5分钟
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点



















在 2020-09-18 14:42:42,"chengyanan1...@foxmail.com"  
写道:
>先占个楼
>我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
>select  
>tumble_start(rowtime, interval '2' MINUTE) as wStart,
>tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>count(1) as pv,
>count(distinct uuid) as uv 
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>最后得到的结果是这样的 :(跟题主不一样)
>
> wStart  wEndpv
> uv
>  2020-09-18T09:14  2020-09-18T09:16 2 
> 2
>  2020-09-18T09:16  2020-09-18T09:18 8 
> 3
>  2020-09-18T09:18  2020-09-18T09:20 8 
> 3
>  2020-09-18T09:20  2020-09-18T09:22 2 
> 2
>
>等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
>wStartwEnd   
>pvuv
>2020-09-18T09:14  2020-09-18T09:16  2  
>   2
>2020-09-18T09:16  2020-09-18T09:18  2  
>   2
>2020-09-18T09:18  2020-09-18T09:20  8  
>   3
>2020-09-18T09:20  2020-09-18T09:22  2  
>   2
>
>
>
> 
>发件人: anonnius
>发送时间: 2020-09-18 11:24
>收件人: user-zh
>主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
>hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
>0> mac本地环境
>1> flink 1.11.1
>2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
>3> 使用的是sql-client.sh 环境
>4> 先在sql-cli中创建了iservVisit表
>create table iservVisit (
>type string comment '时间类型',
>uuid string comment '用户uri',
>clientTime string comment '10位时间戳',
>rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
> '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
>WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
>) with (
>'connector' = 'kafka-0.10',
>'topic' = 'message-json',
>'properties.bootstrap.servers' = 'localhost:9092',
>'properties.group.id' = 'consumer-rt',
>'format' = 'json',
>'json.ignore-parse-errors' = 'true',
>'scan.startup.mode' = 'earliest-offset'
>)
>然后在sql-cli执行sql
>select  
>tumble_start(rowtime, interval '2' MINUTE) as wStart,
>tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>count(1) as pv,
>count(distinct uuid) as uv 
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>5> 向kafka生产者依次发送下面的json消息
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
>{"type":

Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-17 Thread anonnius
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 
6> 第一次结果(这里sql-cli的sql一直在运行)

wStart  wEndpv  
  uv

2020-09-18T09:14  2020-09-18T09:16  5   
  3

2020-09-18T09:16  2020-09-18T09:18  8   
  3

2020-09-18T09:18  2020-09-18T09:20  8   
  3

2020-09-18T09:20  2020-09-18T09:22  2   
  2

7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)

wStartwEnd   pv 
   uv
2020-09-18T09:14  2020-09-18T09:16  2   
  2
2020-09-18T09:16  2020-09-18T09:18  2   
  2
2020-09-18T09:18  2020-09-18T09:20  8   
  3
2020-09-18T09:20  2020-09-18T09:22  2   
  2
8> 详细过程以放入附件文件中





1. sql-client.sh中 建表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 
作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)

2. sql-client.sh中 运行
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

3. kafka 生产者依次写入消息


kafka记录 
clientTime消息时间
产生的watermark时间   说明
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}2020-09-18 
09:14:44   2020-09-18 09:13:44