先占个楼
我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
最后得到的结果是这样的 :(跟题主不一样)
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 2
2
2020-09-18T09:16 2020-09-18T09:18 8
3
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 2
2
2020-09-18T09:16 2020-09-18T09:18 2
2
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
发件人: anonnius
发送时间: 2020-09-18 11:24
收件人: user-zh
主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime,
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}
6> 第一次结果(这里sql-cli的sql一直在运行)
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 5
3
2020-09-18T09:16 2020-09-18T09:18 8
3
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 2
2
2020-09-18T09:16 2020-09-18T09:18 2
2
2020-09-18T09:18 2020-09-18T09:20 8
3
2020-09-18T09:20 2020-09-18T09:22 2
2
8> 详细过程以放入附件文件中