????????????????????????????????????????????????????????????????????????????????????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??10??19??(??????) ????8:03
??????:&nbsp;"867127831"<[email protected]&gt;;
????:&nbsp;"user-zh"<[email protected]&gt;;
????:&nbsp;??????flink sql count distinct??????????



??????????????????????????????user_id????????????????????????????



| |
??????
|
|
[email protected]
|

?????? ???????????? ????

??2020??10??17?? 16:24??867127831 ??????
??flink sql????????????dau????????????????groupby????count distinct 
user_id??????????????table.optimizer.distinct-agg.split.enabled=true????
job??????????????mysql????????????????????????????????????????????????????????????????????????????????????????????????


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ????????&amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ????????
2020-10-10 19:00:00&amp;nbsp; &amp;nbsp;100
2020-10-10 19:00:02&amp;nbsp; &amp;nbsp;98
2020-10-10 19:00:04&amp;nbsp; &amp;nbsp;102
2020-10-10 19:00:06&amp;nbsp; &amp;nbsp;108
2020-10-10 19:00:08&amp;nbsp; &amp;nbsp;106
2020-10-10 19:00:10&amp;nbsp; &amp;nbsp;110


sql??????
create table jdbc_sink(
&amp;nbsp; &amp;nbsp; date_str varchar ,
&amp;nbsp; &amp;nbsp; dau bigint,
&amp;nbsp; &amp;nbsp; PRIMARY KEY (date_str) NOT ENFORCED
) with (
&amp;nbsp; 'connector' = 'jdbc',
&amp;nbsp; 'url' = 'jdbc:mysql://xxx',
&amp;nbsp; 'table-name' = 'xxx',
&amp;nbsp; 'driver' = 'com.mysql.jdbc.Driver',
&amp;nbsp; 'username' = 'xxx',
&amp;nbsp; 'password' = 'xxx'
);


CREATE TABLE action_log_source (
&amp;nbsp; user_id varchar,
&amp;nbsp; event_time TIMESTAMP(3),
&amp;nbsp; WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) with (
&amp;nbsp; ...
);


INSERT INTO
&amp;nbsp; jdbc_sink
SELECT
&amp;nbsp; day_str as date_str,
&amp;nbsp; COUNT(DISTINCT user_id) AS dau
FROM (
&amp;nbsp; select&amp;nbsp;
&amp;nbsp; &amp;nbsp; &amp;nbsp; user_id as user_id,
&amp;nbsp; &amp;nbsp; &amp;nbsp; date_format(event_time, 'yyyy-MM-dd') as 
day_str
&amp;nbsp; from action_log_source
)
GROUP BY day_str

回复