hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 
6> 第一次结果(这里sql-cli的sql一直在运行)

    wStart                                      wEnd                        pv  
                      uv

2020-09-18T09:14          2020-09-18T09:16                  5                   
      3

2020-09-18T09:16          2020-09-18T09:18                  8                   
      3

2020-09-18T09:18          2020-09-18T09:20                  8                   
      3

2020-09-18T09:20          2020-09-18T09:22                  2                   
      2

7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)

wStart                                        wEnd                           pv 
                       uv
2020-09-18T09:14          2020-09-18T09:16                  2                   
      2
2020-09-18T09:16          2020-09-18T09:18                  2                   
      2
2020-09-18T09:18          2020-09-18T09:20                  8                   
      3
2020-09-18T09:20          2020-09-18T09:22                  2                   
      2
8> 详细过程以放入附件文件中





1. sql-client.sh中 建表
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 
作为watermark
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)

2. sql-client.sh中 运行
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

3. kafka 生产者依次写入消息


                        kafka记录                                             
                                                clientTime消息时间    
产生的watermark时间   说明
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}    2020-09-18 
09:14:44   2020-09-18 09:13:44   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}    2020-09-18 
09:14:23   2020-09-18 09:13:44   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}    2020-09-18 
09:14:50   2020-09-18 09:13:50   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}    2020-09-18 
09:15:09   2020-09-18 09:14:09   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}    2020-09-18 
09:15:48   2020-09-18 09:14:48   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}    2020-09-18 
09:16:22   2020-09-18 09:15:22   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}    2020-09-18 
09:16:21   2020-09-18 09:15:22   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)

-- 触发计算
                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:14          2020-09-18T09:16                         5  
                       3
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}    2020-09-18 
09:17:03   2020-09-18 09:16:03   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
                                                                                
                                                                                
                                                                                
                                 触发 结束时间早于 2020-09-18 09:16:00 
的窗口


{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}    2020-09-18 
09:17:02   2020-09-18 09:16:02   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}    2020-09-18 
09:16:55   2020-09-18 09:16:02   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}    2020-09-18 
09:17:37   2020-09-18 09:16:37   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}    2020-09-18 
09:17:50   2020-09-18 09:16:50   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}    2020-09-18 
09:17:31   2020-09-18 09:16:31   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}    2020-09-18 
09:18:23   2020-09-18 09:17:23   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}    2020-09-18 
09:18:09   2020-09-18 09:17:09   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)

-- 触发计算
                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:16          2020-09-18T09:18                         8  
                       3
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}    2020-09-18 
09:19:05   2020-09-18 09:18:05   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
                                                                                
                                                                                
                                                                                
                                 触发 结束时间早于 2020-09-18 09:18:00 
的窗口

{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}    2020-09-18 
09:18:58   2020-09-18 09:16:58   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}    2020-09-18 
09:19:11   2020-09-18 09:17:11   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}    2020-09-18 
09:18:56   2020-09-18 09:16:56   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}    2020-09-18 
09:19:30   2020-09-18 09:17:30   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}    2020-09-18 
09:20:16   2020-09-18 09:18:16   属于窗口 [2020-09-18 09:20:00 - 2020-09-18 
09:22:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}    2020-09-18 
09:19:53   2020-09-18 09:17:53   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}    2020-09-18 
09:20:57   2020-09-18 09:18:57   属于窗口 [2020-09-18 09:20:00 - 2020-09-18 
09:22:00)


-- 触发之前的所有窗口
                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:18          2020-09-18T09:20                         8  
                       3
          2020-09-18T09:20          2020-09-18T09:22                         2  
                       2
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}    2027-01-20 
04:54:17   2027-01-20 04:53:17   属于窗口 [2027-01-20 04:54:00 - 2027-01-20 
04:56:00)
                                                                                
                                                                                
                                                                                
                                 触发 结束时间早于 2027-01-20 04:52:00 
的窗口
3. sql-client.sh中显示的结果

                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:14          2020-09-18T09:16                         5  
                       3
          2020-09-18T09:16          2020-09-18T09:18                         8  
                       3
          2020-09-18T09:18          2020-09-18T09:20                         8  
                       3
          2020-09-18T09:20          2020-09-18T09:22                         2  
                       2


4. 在次运行
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:14          2020-09-18T09:16                         2  
                       2
          2020-09-18T09:16          2020-09-18T09:18                         2  
                       2
          2020-09-18T09:18          2020-09-18T09:20                         8  
                       3
          2020-09-18T09:20          2020-09-18T09:22                         2  
                       2



5. 此时的明细数据 
select * from iservVisit
                      type                      uuid                clientTime  
                 rowtime
                                        iservVisit                         a    
            1600391663       2020-09-18T09:14:23
                iservVisit                         b                1600391748  
     2020-09-18T09:15:48
                iservVisit                         b                1600391823  
     2020-09-18T09:17:03
                iservVisit                         a                1600391857  
     2020-09-18T09:17:37
                iservVisit                         c                1600391903  
     2020-09-18T09:18:23
                iservVisit                         b                1600391938  
     2020-09-18T09:18:58
                iservVisit                         b                1600391970  
     2020-09-18T09:19:30
                iservVisit                         a                1600392057  
     2020-09-18T09:20:57
                iservVisit                         c                1600391684  
     2020-09-18T09:14:44
                iservVisit                         c                1600391709  
     2020-09-18T09:15:09
                iservVisit                         b                1600391781  
     2020-09-18T09:16:21
                iservVisit                         a                1600391815  
     2020-09-18T09:16:55
                iservVisit                         b                1600391851  
     2020-09-18T09:17:31
                iservVisit                         a                1600391945  
     2020-09-18T09:19:05
                iservVisit                         c                1600391936  
     2020-09-18T09:18:56
                iservVisit                         c                1600391993  
     2020-09-18T09:19:53
                iservVisit                         a                1600391690  
     2020-09-18T09:14:50
                iservVisit                         c                1600391782  
     2020-09-18T09:16:22
                iservVisit                         b                1600391822  
     2020-09-18T09:17:02
                iservVisit                         a                1600391870  
     2020-09-18T09:17:50
                iservVisit                         a                1600391889  
     2020-09-18T09:18:09
                iservVisit                         b                1600391951  
     2020-09-18T09:19:11
                iservVisit                         c                1600392016  
     2020-09-18T09:20:16
                iservVisit                         a                1800392057  
     2027-01-20T04:54:17

回复