Hi,

??????????????




------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??5??26??(??????) ????8:26
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: ?????????????? Flink SQL State ????????



Hi,

??????????bug??????????????????????????????????????????????????bug????????????????????????????????issue[1]
 ??????????????

[1] https://issues.apache.org/jira/browse/FLINK-17942

LakeShen <[email protected]&gt; ??2020??5??26?????? ????8:14??????

&gt; Hi,
&gt;
&gt; ???????????????????????????????? server,reason ???????????????? group by
&gt;
&gt; Best,
&gt; LakeShen
&gt;
&gt; Benchao Li <[email protected]&gt; ??2020??5??26?????? ????6:50??????
&gt;
&gt; &gt; Hi??
&gt; &gt;
&gt; &gt; ????????????????????????????????????????????????????????????
&gt; &gt; 1.
&gt; 
????watermark??????????????????????window????????????????????????watermark????????????????????????window??????????
&gt; &gt; 2. 
????????????state????????????checkpoint??????????????????heap??????????
&gt; &gt;
&gt; &gt; ???? <[email protected]&gt; ??2020??5??26?????? ????6:07??????
&gt; &gt;
&gt; &gt; &gt; hi????????!
&gt; &gt; &gt;
&gt; &gt; &gt; ???????????? Flink SQL ???????????????? job ????state 
????????????????
&gt; &gt; &gt;
&gt; &gt; &gt; SQL??source ?? sink ???? kafka 
??????????????????????????????5???????? server,reason ????????????????
&gt; &gt; &gt; role_id ????????
&gt; &gt; &gt;
&gt; &gt; &gt; ????????????????????????????????????????????????state 
????????????????????????????????source???????????????? state
&gt; &gt; &gt; ?????????????????????? SQL ??????????????
&gt; &gt; &gt;
&gt; &gt; &gt; ??????????????????
&gt; &gt; &gt;
&gt; &gt; &gt; ??????
&gt; &gt; &gt;
&gt; &gt; &gt; ----------------
&gt; &gt; &gt;
&gt; &gt; &gt; CREATE TABLE source_kafka (
&gt; &gt; &gt;&nbsp;&nbsp; dtime string,
&gt; &gt; &gt;&nbsp;&nbsp; wm as cast(dtime as TIMESTAMP(3)),
&gt; &gt; &gt;&nbsp;&nbsp; server string,
&gt; &gt; &gt;&nbsp;&nbsp; reason string,
&gt; &gt; &gt;&nbsp;&nbsp; role_id string,
&gt; &gt; &gt;&nbsp;&nbsp; WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
&gt; &gt; &gt; ) WITH (
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.version' = '0.11',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.topic' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.group.id' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'format.type' = 'json',
&gt; &gt; &gt; )
&gt; &gt; &gt; -----------------
&gt; &gt; &gt;
&gt; &gt; &gt; CREATE TABLE sink_kafka (
&gt; &gt; &gt;&nbsp;&nbsp; window_time string,
&gt; &gt; &gt;&nbsp;&nbsp; server string,
&gt; &gt; &gt;&nbsp;&nbsp; reason string,
&gt; &gt; &gt;&nbsp;&nbsp; role_id_distinct_cnt BIGINT,
&gt; &gt; &gt;&nbsp;&nbsp; log_cnt BIGINT
&gt; &gt; &gt; ) WITH (
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.version' = '0.11',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.topic' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'format.type' = 'json'
&gt; &gt; &gt; )
&gt; &gt; &gt; -----------------
&gt; &gt; &gt;
&gt; &gt; &gt; INSERT INTO sink_kafka
&gt; &gt; &gt; SELECT
&gt; &gt; &gt;&nbsp; DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 
'yyyy-MM-dd
&gt; &gt; HH:mm:ss')
&gt; &gt; &gt; AS window_time,
&gt; &gt; &gt;&nbsp; server,
&gt; &gt; &gt;&nbsp; reason,
&gt; &gt; &gt;&nbsp; COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
&gt; &gt; &gt;&nbsp; COUNT(1) AS log_cnt
&gt; &gt; &gt; FROM source_kafka
&gt; &gt; &gt; GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; --
&gt; &gt;
&gt; &gt; Best,
&gt; &gt; Benchao Li
&gt; &gt;
&gt;


-- 

Best,
Benchao Li

回复