hi????????!

???????????? Flink SQL ???????????????? job ????state ????????????????

SQL??source ?? sink ???? kafka ??????????????????????????????5???????? 
server,reason ???????????????? role_id ????????

????????????????????????????????????????????????state 
????????????????????????????????source???????????????? state 
?????????????????????? SQL ??????????????

??????????????????

??????

----------------

CREATE TABLE source_kafka (
  dtime string,
  wm as cast(dtime as TIMESTAMP(3)),
  server string,
  reason string,
  role_id string,
  WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'format.type' = 'json',
)
-----------------

CREATE TABLE sink_kafka (
  window_time string,
  server string,
  reason string,
  role_id_distinct_cnt BIGINT,
  log_cnt BIGINT
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'format.type' = 'json'
)
-----------------

INSERT INTO sink_kafka 
SELECT 
 DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') AS 
window_time,
 server,
 reason,
 COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
 COUNT(1) AS log_cnt
FROM source_kafka 
GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason

回复