hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime,
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}
6> 第一次结果(这里sql-cli的sql一直在运行)
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 5
3
2020-09-18T09:16 2020-09-18T09:18 8
3
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 2
2
2020-09-18T09:16 2020-09-18T09:18 2
2
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
8> 详细过程以放入附件文件中
1. sql-client.shä¸ å»ºè¡¨
create table iservVisit (
type string comment 'æ¶é´ç±»å',
uuid string comment 'ç¨æ·uri',
clientTime string comment '10使¶é´æ³',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime,
'0'), 1, 10) as bigint))), -- 计ç®å, 10使¶é´æ³è½¬ä¸ºtimestampç±»å
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计ç®å,
ä½ä¸ºwatermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
2. sql-client.shä¸ è¿è¡
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
3. kafka ç产è
便¬¡åå
¥æ¶æ¯
kafkaè®°å½
clientTimeæ¶æ¯æ¶é´
产ççwatermarkæ¶é´ 说æ
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 2020-09-18
09:14:44 2020-09-18 09:13:44 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 2020-09-18
09:14:23 2020-09-18 09:13:44 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 2020-09-18
09:14:50 2020-09-18 09:13:50 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 2020-09-18
09:15:09 2020-09-18 09:14:09 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 2020-09-18
09:15:48 2020-09-18 09:14:48 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 2020-09-18
09:16:22 2020-09-18 09:15:22 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 2020-09-18
09:16:21 2020-09-18 09:15:22 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
-- 触å计ç®
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 5
3
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 2020-09-18
09:17:03 2020-09-18 09:16:03 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
触å ç»ææ¶é´æ©äº 2020-09-18 09:16:00
ççªå£
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 2020-09-18
09:17:02 2020-09-18 09:16:02 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 2020-09-18
09:16:55 2020-09-18 09:16:02 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 2020-09-18
09:17:37 2020-09-18 09:16:37 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 2020-09-18
09:17:50 2020-09-18 09:16:50 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 2020-09-18
09:17:31 2020-09-18 09:16:31 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 2020-09-18
09:18:23 2020-09-18 09:17:23 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 2020-09-18
09:18:09 2020-09-18 09:17:09 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
-- 触å计ç®
wStart wEnd pv
uv
2020-09-18T09:16 2020-09-18T09:18 8
3
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 2020-09-18
09:19:05 2020-09-18 09:18:05 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
触å ç»ææ¶é´æ©äº 2020-09-18 09:18:00
ççªå£
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 2020-09-18
09:18:58 2020-09-18 09:16:58 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 2020-09-18
09:19:11 2020-09-18 09:17:11 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 2020-09-18
09:18:56 2020-09-18 09:16:56 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 2020-09-18
09:19:30 2020-09-18 09:17:30 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 2020-09-18
09:20:16 2020-09-18 09:18:16 å±äºçªå£ [2020-09-18 09:20:00 - 2020-09-18
09:22:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 2020-09-18
09:19:53 2020-09-18 09:17:53 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 2020-09-18
09:20:57 2020-09-18 09:18:57 å±äºçªå£ [2020-09-18 09:20:00 - 2020-09-18
09:22:00)
-- 触åä¹åçææçªå£
wStart wEnd pv
uv
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 2027-01-20
04:54:17 2027-01-20 04:53:17 å±äºçªå£ [2027-01-20 04:54:00 - 2027-01-20
04:56:00)
触å ç»ææ¶é´æ©äº 2027-01-20 04:52:00
ççªå£
3. sql-client.sh䏿¾ç¤ºçç»æ
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 5
3
2020-09-18T09:16 2020-09-18T09:18 8
3
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
4. 卿¬¡è¿è¡
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 2
2
2020-09-18T09:16 2020-09-18T09:18 2
2
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
5. æ¤æ¶çæç»æ°æ®
select * from iservVisit
type uuid clientTime
rowtime
iservVisit a
1600391663 2020-09-18T09:14:23
iservVisit b 1600391748
2020-09-18T09:15:48
iservVisit b 1600391823
2020-09-18T09:17:03
iservVisit a 1600391857
2020-09-18T09:17:37
iservVisit c 1600391903
2020-09-18T09:18:23
iservVisit b 1600391938
2020-09-18T09:18:58
iservVisit b 1600391970
2020-09-18T09:19:30
iservVisit a 1600392057
2020-09-18T09:20:57
iservVisit c 1600391684
2020-09-18T09:14:44
iservVisit c 1600391709
2020-09-18T09:15:09
iservVisit b 1600391781
2020-09-18T09:16:21
iservVisit a 1600391815
2020-09-18T09:16:55
iservVisit b 1600391851
2020-09-18T09:17:31
iservVisit a 1600391945
2020-09-18T09:19:05
iservVisit c 1600391936
2020-09-18T09:18:56
iservVisit c 1600391993
2020-09-18T09:19:53
iservVisit a 1600391690
2020-09-18T09:14:50
iservVisit c 1600391782
2020-09-18T09:16:22
iservVisit b 1600391822
2020-09-18T09:17:02
iservVisit a 1600391870
2020-09-18T09:17:50
iservVisit a 1600391889
2020-09-18T09:18:09
iservVisit b 1600391951
2020-09-18T09:19:11
iservVisit c 1600392016
2020-09-18T09:20:16
iservVisit a 1800392057
2027-01-20T04:54:17