hi: 感觉你的关注和回复
1> 下面是我的分析过程
1. 第一次是, 先在sql-client.sh 中执行sql
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入,
并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的
2. 第二次是, 退出sql-client.sh后在执行sql
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和
手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了
3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime,
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 作为watermark,
有1分钟变为5分钟
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点
在 2020-09-18 14:42:42,"[email protected]" <[email protected]>
写道:
>先占个楼
>我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
>select
> tumble_start(rowtime, interval '2' MINUTE) as wStart,
> tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> count(1) as pv,
> count(distinct uuid) as uv
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>最后得到的结果是这样的 :(跟题主不一样)
>
> wStart wEnd pv
> uv
> 2020-09-18T09:14 2020-09-18T09:16 2
> 2
> 2020-09-18T09:16 2020-09-18T09:18 8
> 3
> 2020-09-18T09:18 2020-09-18T09:20 8
> 3
> 2020-09-18T09:20 2020-09-18T09:22 2
> 2
>
>等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
>wStart wEnd
>pv uv
>2020-09-18T09:14 2020-09-18T09:16 2
> 2
>2020-09-18T09:16 2020-09-18T09:18 2
> 2
>2020-09-18T09:18 2020-09-18T09:20 8
> 3
>2020-09-18T09:20 2020-09-18T09:22 2
> 2
>
>
>
>
>发件人: anonnius
>发送时间: 2020-09-18 11:24
>收件人: user-zh
>主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
>hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
>0> mac本地环境
>1> flink 1.11.1
>2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
>3> 使用的是sql-client.sh 环境
>4> 先在sql-cli中创建了iservVisit表
>create table iservVisit (
> type string comment '时间类型',
> uuid string comment '用户uri',
> clientTime string comment '10位时间戳',
> rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime,
> '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
> WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
>) with (
> 'connector' = 'kafka-0.10',
> 'topic' = 'message-json',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'consumer-rt',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true',
> 'scan.startup.mode' = 'earliest-offset'
>)
>然后在sql-cli执行sql
>select
> tumble_start(rowtime, interval '2' MINUTE) as wStart,
> tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> count(1) as pv,
> count(distinct uuid) as uv
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>5> 向kafka生产者依次发送下面的json消息
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}
>{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}
>6> 第一次结果(这里sql-cli的sql一直在运行)
> wStart wEnd pv
> uv
>2020-09-18T09:14 2020-09-18T09:16 5
> 3
>2020-09-18T09:16 2020-09-18T09:18 8
> 3
>2020-09-18T09:18 2020-09-18T09:20 8
> 3
>2020-09-18T09:20 2020-09-18T09:22 2
> 2
>7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
>wStart wEnd
>pv uv
>2020-09-18T09:14 2020-09-18T09:16 2
> 2
>2020-09-18T09:16 2020-09-18T09:18 2
> 2
>2020-09-18T09:18 2020-09-18T09:20 8
> 3
>2020-09-18T09:20 2020-09-18T09:22 2
> 2
>8> 详细过程以放入附件文件中
>
>
>
>
>
>
1. ç¬¬ä¸æ¬¡æ¯, å
å¨sql-client.sh 䏿§è¡sql
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
æ¤æ¶, ç±äºæ°æ® æ¯ä¸æ¡ä¸æ¡çéè¿kafkaç产è
å·¥å
·(kafka-console-producer.sh)åå
¥,
å¹¶ä¸ç±kafka-connectorä¼ä¸åçæ¶è´¹æ°æ®,
è·åçæ°æ®æ¯åæå¨åå
¥çæ°æ®çé¡ºåºæ¯ä¸æ ·ç
kafkaè®°å½
clientTimeæ¶æ¯æ¶é´ 产ççwatermarkæ¶é´ 说æ
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 2020-09-18
09:14:44 2020-09-18 09:13:44 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 2020-09-18
09:14:23 2020-09-18 09:13:44 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 2020-09-18
09:14:50 2020-09-18 09:13:50 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 2020-09-18
09:15:09 2020-09-18 09:14:09 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 2020-09-18
09:15:48 2020-09-18 09:14:48 å±äºçªå£ [2020-09-18 09:14:00 - 2020-09-18
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 2020-09-18
09:16:22 2020-09-18 09:15:22 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 2020-09-18
09:16:21 2020-09-18 09:15:22 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
-- 触å计ç®
wStart wEnd pv
uv
2020-09-18T09:14 2020-09-18T09:16 5
3
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 2020-09-18
09:17:03 2020-09-18 09:16:03 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
触å ç»ææ¶é´æ©äº 2020-09-18 09:16:00
ççªå£
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 2020-09-18
09:17:02 2020-09-18 09:16:02 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 2020-09-18
09:16:55 2020-09-18 09:16:02 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 2020-09-18
09:17:37 2020-09-18 09:16:37 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 2020-09-18
09:17:50 2020-09-18 09:16:50 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 2020-09-18
09:17:31 2020-09-18 09:16:31 å±äºçªå£ [2020-09-18 09:16:00 - 2020-09-18
09:18:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 2020-09-18
09:18:23 2020-09-18 09:17:23 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 2020-09-18
09:18:09 2020-09-18 09:17:09 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
-- 触å计ç®
wStart wEnd pv
uv
2020-09-18T09:16 2020-09-18T09:18 8
3
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 2020-09-18
09:19:05 2020-09-18 09:18:05 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
触å ç»ææ¶é´æ©äº 2020-09-18 09:18:00
ççªå£
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 2020-09-18
09:18:58 2020-09-18 09:16:58 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 2020-09-18
09:19:11 2020-09-18 09:17:11 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 2020-09-18
09:18:56 2020-09-18 09:16:56 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 2020-09-18
09:19:30 2020-09-18 09:17:30 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 2020-09-18
09:20:16 2020-09-18 09:18:16 å±äºçªå£ [2020-09-18 09:20:00 - 2020-09-18
09:22:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 2020-09-18
09:19:53 2020-09-18 09:17:53 å±äºçªå£ [2020-09-18 09:18:00 - 2020-09-18
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 2020-09-18
09:20:57 2020-09-18 09:18:57 å±äºçªå£ [2020-09-18 09:20:00 - 2020-09-18
09:22:00)
2. ç¬¬äºæ¬¡æ¯, éåºsql-client.shå卿§è¡sql
select
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
è¿æ¶ç±äºæ°æ®å·²ç»åå
¥kafkaäº,
å¨ç±kafka-connectorè¿è¡æ¶è´¹çæ¶å, ç±äºtopicæ3个ååº,
æ¶è´¹åè·åçæ¶æ¯ç顺åºå æå¨éè¿kafkaç产è
å·¥å
·(kafka-console-producer.sh)åå
¥æ¶ç顺åº
ä¸ä¸è´äº, è¿æ ·rowtimeæ¶é´é åçæ°æ®å¯è½å
被æ¶è´¹,
导è´äº§çäºæ¯è¾å¤§çwatermark,
导è´åç»æ¶è´¹çé¨åæ¶æ¯è¢«å¿½ç¥äº
æ¤æ¶çæç»æ°æ®(æææ°æ®åå
¥kafkaå, æ§è¡select * from
iservVisit)
type uuid clientTime rowtime
watermark(æå¨) æå±çªå£(æå¨)
iservVisit a 1600391663 2020-09-18
09:14:23 2020-09-18 09:13:23 [2020-09-18 09:14:00 2020-09-18 09:16:00) W1
iservVisit b 1600391748 2020-09-18
09:15:48 2020-09-18 09:14:48 [2020-09-18 09:14:00 2020-09-18 09:16:00) W1
iservVisit b 1600391823 2020-09-18
09:17:03 2020-09-18 09:16:03 [2020-09-18 09:16:00 2020-09-18 09:18:00) W2
触å [2020-09-18 09:14:00 2020-09-18
09:16:00) W1
iservVisit a 1600391857 2020-09-18
09:17:37 2020-09-18 09:16:37 [2020-09-18 09:16:00 2020-09-18 09:18:00) W2
iservVisit c 1600391903 2020-09-18
09:18:23 2020-09-18 09:17:23 [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit b 1600391938 2020-09-18
09:18:58 2020-09-18 09:17:58 [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit b 1600391970 2020-09-18
09:19:30 2020-09-18 09:18:30 [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
触å [2020-09-18 09:16:00 2020-09-18
09:18:00) W2
iservVisit a 1600392057 2020-09-18
09:20:57 2020-09-18 09:19:57 [2020-09-18 09:20:00 2020-09-18 09:22:00) W4
iservVisit c 1600391684 2020-09-18
09:14:44 - [2020-09-18 09:14:00 2020-09-18 09:16:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit c 1600391709 2020-09-18
09:15:09 - [2020-09-18 09:14:00 2020-09-18 09:16:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit b 1600391781 2020-09-18
09:16:21 - [2020-09-18 09:16:00 2020-09-18 09:18:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit a 1600391815 2020-09-18
09:16:55 - [2020-09-18 09:16:00 2020-09-18 09:18:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit b 1600391851 2020-09-18
09:17:31 - [2020-09-18 09:16:00 2020-09-18 09:18:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit a 1600391945 2020-09-18
09:19:05 - [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit c 1600391936 2020-09-18
09:18:56 - [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit c 1600391993 2020-09-18
09:19:53 - [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit a 1600391690 2020-09-18
09:14:50 - [2020-09-18 09:14:00 2020-09-18 09:16:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit c 1600391782 2020-09-18
09:16:22 - [2020-09-18 09:16:00 2020-09-18 09:18:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit b 1600391822 2020-09-18
09:17:02 - [2020-09-18 09:16:00 2020-09-18 09:18:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit a 1600391870 2020-09-18
09:17:50 - [2020-09-18 09:16:00 2020-09-18 09:18:00)
忽ç¥(çªå£ä»¥è§¦å)
iservVisit a 1600391889 2020-09-18
09:18:09 - [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit b 1600391951 2020-09-18
09:19:11 - [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
iservVisit c 1600392016 2020-09-18
09:20:16 - [2020-09-18 09:20:00 2020-09-18 09:22:00) W4
iservVisit a 1800392057 2027-01-20
04:54:17 2027-01-20 04:53:17 触å [0000-00-00 00:00:00 2027-01-20
04:53:17) W3 W4
3. éè¿å°å»ºè¡¨æ¶ watermarkçé´éå大äº, è½è¿åç¬¬ä¸æ¬¡çç»æ,
è¿ç§æ¹å¼è¿æ¯èèä¸(èèæ¯å¦ä¸è´ææ)
create table iservVisit (
type string comment 'æ¶é´ç±»å',
uuid string comment 'ç¨æ·uri',
clientTime string comment '10使¶é´æ³',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime,
'0'), 1, 10) as bigint))), -- 计ç®å, 10使¶é´æ³è½¬ä¸ºtimestampç±»å
WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计ç®å,
ä½ä¸ºwatermark, æ1åéå为5åé
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
4. ç°å¨çé®é¢æ¯å¦ä½ä¿è¯/æéè¿ä»ä¹åæ³,
让æ¯ä¸ªååºçæ¶è´¹æ°æ®ä¸è´