hi????????! ???????????? Flink SQL ???????????????? job ????state ????????????????
SQL??source ?? sink ???? kafka ??????????????????????????????5???????? server,reason ???????????????? role_id ???????? ????????????????????????????????????????????????state ????????????????????????????????source???????????????? state ?????????????????????? SQL ?????????????? ?????????????????? ?????? ---------------- CREATE TABLE source_kafka ( dtime string, wm as cast(dtime as TIMESTAMP(3)), server string, reason string, role_id string, WATERMARK FOR wm AS wm - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.group.id' = 'xxx', 'format.type' = 'json', ) ----------------- CREATE TABLE sink_kafka ( window_time string, server string, reason string, role_id_distinct_cnt BIGINT, log_cnt BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'format.type' = 'json' ) ----------------- INSERT INTO sink_kafka SELECT DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') AS window_time, server, reason, COUNT(DISTINCT role_id) AS role_id_distinct_cnt, COUNT(1) AS log_cnt FROM source_kafka GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
