????????????????????????????????????????????????????????????????????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??10??19??(??????) ????8:03
??????: "867127831"<[email protected]>;
????: "user-zh"<[email protected]>;
????: ??????flink sql count distinct??????????
??????????????????????????????user_id????????????????????????????
| |
??????
|
|
[email protected]
|
?????? ???????????? ????
??2020??10??17?? 16:24??867127831 ??????
??flink sql????????????dau????????????????groupby????count distinct
user_id??????????????table.optimizer.distinct-agg.split.enabled=true????
job??????????????mysql????????????????????????????????????????????????????????????????????????????????????????????????
&nbsp; &nbsp; &nbsp; &nbsp; ????????&nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ????????
2020-10-10 19:00:00&nbsp; &nbsp;100
2020-10-10 19:00:02&nbsp; &nbsp;98
2020-10-10 19:00:04&nbsp; &nbsp;102
2020-10-10 19:00:06&nbsp; &nbsp;108
2020-10-10 19:00:08&nbsp; &nbsp;106
2020-10-10 19:00:10&nbsp; &nbsp;110
sql??????
create table jdbc_sink(
&nbsp; &nbsp; date_str varchar ,
&nbsp; &nbsp; dau bigint,
&nbsp; &nbsp; PRIMARY KEY (date_str) NOT ENFORCED
) with (
&nbsp; 'connector' = 'jdbc',
&nbsp; 'url' = 'jdbc:mysql://xxx',
&nbsp; 'table-name' = 'xxx',
&nbsp; 'driver' = 'com.mysql.jdbc.Driver',
&nbsp; 'username' = 'xxx',
&nbsp; 'password' = 'xxx'
);
CREATE TABLE action_log_source (
&nbsp; user_id varchar,
&nbsp; event_time TIMESTAMP(3),
&nbsp; WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) with (
&nbsp; ...
);
INSERT INTO
&nbsp; jdbc_sink
SELECT
&nbsp; day_str as date_str,
&nbsp; COUNT(DISTINCT user_id) AS dau
FROM (
&nbsp; select&nbsp;
&nbsp; &nbsp; &nbsp; user_id as user_id,
&nbsp; &nbsp; &nbsp; date_format(event_time, 'yyyy-MM-dd') as
day_str
&nbsp; from action_log_source
)
GROUP BY day_str