???????????????? ????????????????????????kafka??????????????kafka????????????????????????????????????????????????????????????????????????????????????????????????????????kafka????????????????????????????????????????????????????????????????Debug???????????????????????????????????????? ??????????????????????AbstractFetcher.java ?? emitRecordsWithTimestamps ??????????????????????????????????????????????for?????????????????????????????????????????????????????????????????????? ?????? AbstractFetcher ???? emitRecordsWithTimestamps ?????????? protected void emitRecordsWithTimestamps( Queue<T> records, KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) { // emit the records, using the checkpoint lock to guarantee // atomicity of record emission and offset state update synchronized (checkpointLock) { T record; while ((record = records.poll()) != null) { long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); sourceContext.collectWithTimestamp(record, timestamp);
// this might emit a watermark, so do it after emitting the record partitionState.onEvent(record, timestamp); } partitionState.setOffset(offset); } } ?????????????? ???? kafka??????????0??????5s???????? CREATE TABLE kafka_watermark ( `f1` INTEGER, `f2` TIMESTAMP(3), WATERMARK FOR f2 AS f2 - INTERVAL '0' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'test_watermark', 'properties.group.id' = 'groupId', 'properties.max.poll.records' = '10', 'properties.max.poll.interval.ms' = '1000', 'properties.fetch.max.bytes' = '52428800', 'properties.max.partition.fetch.bytes' = '1048576', 'scan.startup.mode' = 'earliest-offset', 'properties.enable.auto.commit' = 'false', 'format' = 'json', 'json.ignore-parse-errors' = 'false', 'json.timestamp-format.standard' = 'SQL', 'json.map-null-key.mode' = 'FAIL' ); ?6?7 SELECT f1, f2 FROM kafka_watermark GROUP BY TUMBLE(f2, INTERVAL '5' SECOND), f1,f2 ???????? ???????????????? {"f1":1,"f2":"2022-02-28 21:00:01.000"} ??????????1??????????????????0-5??????????1?? ???????????????? {"f1":4,"f2":"2022-02-28 21:00:04.000"} ??????????4??????????????????0-5??????????1??4?? ???????????????? {"f1":5,"f2":"2022-02-28 21:00:05.000"} ??????????5??>=5????????0-5????????????????????????1??4??????5-10??????????5?? ??????????????1??4 {"f1":2,"f2":"2022-02-28 21:00:02.000"} ????????????5??0????????0-5????????????????2??????????????????????5-10??????????5?? ?????????????????? {"f1":3,"f2":"2022-02-28 21:00:03.000"} ????????????5??0????????0-5????????????????3??????????????????????5-10??????????5?? ?????????????????? {"f1":7,"f2":"2022-02-28 21:00:07.000"} ??????????7??????????????????5-10??????????5??7?? ?????????????????? {"f1":6,"f2":"2022-02-28 21:00:06.000"} ????????????7??????????????????5-10??????????5??7??6?? ?????????????????? {"f1":10,"f2":"2022-02-28 21:00:10.000"} ??????????10??>=10????????5-10????????????????????????5??7??6??????10-15??????????10?? ????????????????5??7??6??????????????1??4??5??7??6 ?????????? ?????????????? ??1 ?????????????? {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"} {"f1":2,"f2":"2022-02-28 21:00:02.000"} {"f1":23,"f2":"2022-02-28 21:00:02.000"} {"f1":2,"f2":"2022-02-28 21:00:02.000"} {"f1":3,"f2":"2022-02-28 21:00:03.000"} ??????????????????3??????????????????0-5??????????1??4??2??23??3??????5-10??????????5?? ???????????????? ?????????? {"f1":6,"f2":"2022-02-28 21:00:06.000"} ??????6?????????? >=5????????0-5????????????????????????1??4??2??23??3??????5-10??????????6??5?? ??????????????1??4??2??23??3 {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":2,"f2":"2022-02-28 21:00:02.000"} {"f1":23,"f2":"2022-02-28 21:00:02.000"} {"f1":3,"f2":"2022-02-28 21:00:03.000"} ?????????? {"f1":12,"f2":"2022-02-28 21:00:12.000"} ??????12?????????? >=10????????5-10????????????????????????6??5??????10-15??????????12?? ????????????????6??5??????????????1??4??2??23??3??6??5 {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":2,"f2":"2022-02-28 21:00:02.000"} {"f1":23,"f2":"2022-02-28 21:00:02.000"} {"f1":3,"f2":"2022-02-28 21:00:03.000"} {"f1":6,"f2":"2022-02-28 21:00:06.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"} ?????????? ??2 ?????????????? {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"} {"f1":3,"f2":"2022-02-28 21:00:03.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"} {"f1":10,"f2":"2022-02-28 21:00:10.000"} {"f1":9,"f2":"2022-02-28 21:00:09.000"} {"f1":6,"f2":"2022-02-28 21:00:06.000"} ??????????????????6??>=5????????0-5????????????????????????1??4??3??????5-10??????????9??6??5??????10-15??????????10?? ??????????????1??4??3 {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":3,"f2":"2022-02-28 21:00:03.000"} ?????????? {"f1":7,"f2":"2022-02-28 21:00:07.000"} {"f1":8,"f2":"2022-02-28 21:00:08.000"} ??????????8????????????????????5-10??????????9??6??8??7??5??????10-15??????????10?? ??????????????1??4??3 ?????????? {"f1":11,"f2":"2022-02-28 21:00:11.000"} ??????11?????????? >=10????????5-10????????????????????????9??6??8??7??5??????10-15??????????10??11?? ????????????????9??6??8??7??5??????????????1??4??3??9??6??8??7??5 {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":3,"f2":"2022-02-28 21:00:03.000"} {"f1":9,"f2":"2022-02-28 21:00:09.000"} {"f1":6,"f2":"2022-02-28 21:00:06.000"} {"f1":8,"f2":"2022-02-28 21:00:08.000"} {"f1":7,"f2":"2022-02-28 21:00:07.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"} ??3 ?????????????? {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"} {"f1":7,"f2":"2022-02-28 21:00:07.000"} {"f1":10,"f2":"2022-02-28 21:00:10.000"} {"f1":1,"f2":"2022-02-28 21:00:01.000"} ??????????????????1??????????????????0-5??????????1??4??????5-10??????????7??5??????10-15??????????10?? ???????????????? ?????????? {"f1":10,"f2":"2022-02-28 21:00:10.000"} ??????10?????????? >=5 ?? >= 10????????0-5??5-10??????????????0-5????????1??4??5-10????7??5??????10-15??????????10?? ??????????????1??4??7??5 {"f1":1,"f2":"2022-02-28 21:00:01.000"} {"f1":4,"f2":"2022-02-28 21:00:04.000"} {"f1":7,"f2":"2022-02-28 21:00:07.000"} {"f1":5,"f2":"2022-02-28 21:00:05.000"}