??????1.10??????????????????,???????????????????????????????
val resTmpTab: Table = tabEnv.sqlQuery(
"""
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT
userkey) uv
FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """)
val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
.filter(line=>line._1==true).map(line=>line._2)
val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
s"""
INSERT INTO rt_totaluv
SELECT _1,MAX(_2)
FROM $res
GROUP BY _1
""")
------------------ ???????? ------------------
??????: "Jark Wu"<[email protected]>;
????????: 2020??6??17??(??????) ????1:55
??????: "user-zh"<[email protected]>;
????: Re: ??????FLINKSQL1.10????????????UV
?? Flink 1.11 ????????????????????
CREATE TABLE mysql (
time_str STRING,
uv BIGINT,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'myuv'
);
INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT
user_id)
FROM user_behavior;
On Wed, 17 Jun 2020 at 13:49, x <[email protected]> wrote:
>
??????????????????????"??????"????????????????????????????????????????????UV??
> sink??????????
> tm uv
> 2020/06/17 13:46:00 10000
> 2020/06/17 13:47:00 20000
> 2020/06/17 13:48:00 30000
>
>
> group by ??????????????????????
>
>
> ------------------&nbsp;????????&nbsp;------------------
> ??????:&nbsp;"Benchao Li"<[email protected]&gt;;
> ????????:&nbsp;2020??6??17??(??????) ????11:46
> ??????:&nbsp;"user-zh"<[email protected]&gt;;
>
> ????:&nbsp;Re: ??????FLINKSQL1.10????????????UV
>
>
>
> Hi??
> ??????????????????????????????
> 1. ??????????group by + mini batch
> 2. window???? + fast emit
>
> ????#1??group
by????????????????????????????????????????????????DATE_FORMAT(rowtm,
'yyyy-MM-dd')??
> ??????????????????????????????state retention??????????????????????[1]
????????mini batch????????????
> ??????[2] ????????
>
>
????#2????????????????????????tumble????????????????????????????????????????????????????
> fast
emit????????????????????experimental??feature????????????????????????????????????????????????????????????
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <[email protected]&gt; ??2020??6??17?????? ????11:14??????
>
> &gt;
??????????????????????0??????????????UV??????????????????????????????????????UV??????????????????????
> &gt; CREATE VIEW uv_per_10min AS
> &gt; SELECT&amp;nbsp;
> &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, 'yyyy-MM-dd
> HH:mm:00'))&amp;nbsp;OVER w
> &gt; AS time_str,&amp;nbsp;
> &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> &gt; FROM user_behavior
> &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING
AND
> &gt; CURRENT ROW);
> &gt;
> &gt;
> &gt; ??????????????????????????
> &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
??????????????????????????????????
> &gt; PS??1.10??????????DDL??????????CREATE VIEW??
> &gt; ????