??????1.10.1??????sink????????????????window??????count
distinct??????????????????????????????????window??????count
distinct??????????????????????????????????????????????window????????????????group DATE_FORMAT(rowtm,
'yyyy-MM-dd') ????sql??????????????????????????
val rt_totaluv_view : Table = tabEnv.sqlQuery(
"""
SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd HH:mm:00'))
time_str,COUNT(DISTINCT userkey) uv
FROM source
GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
""")
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
.filter( line => line._1 == true ).map( line => line._2 )
val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
tabEnv.sqlUpdate(
s"""
INSERT INTO mysql_totaluv
SELECT _1,MAX(_2)
FROM $totaluvTabTmp
GROUP BY _1
""")
------------------ ???????? ------------------
??????: "Benchao Li"<[email protected]>;
????????: 2020??7??3??(??????) ????9:47
??????: "user-zh"<[email protected]>;
????: Re: ??????FLINKSQL1.10????????????UV
??????????????????????????????????????????[1]??????window??????count
distinct??????????????
??????????1.11??????????
[1] https://issues.apache.org/jira/browse/FLINK-17942
x <[email protected]> ??2020??7??3?????? ????4:34??????
>
????????????????????????????????checkpoint??????????????????????????????????
>
>
????????tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??????????????????????key????????????????????????????
>
>
>
>
> ------------------&nbsp;????????&nbsp;------------------
> ??????:&nbsp;"Jark Wu"<[email protected]&gt;;
> ????????:&nbsp;2020??6??18??(??????) ????12:16
> ??????:&nbsp;"user-zh"<[email protected]&gt;;
>
> ????:&nbsp;Re: ??????FLINKSQL1.10????????????UV
>
>
>
> ??????????????????????????????
>
> On Thu, 18 Jun 2020 at 10:34, x <[email protected]&gt; wrote:
>
> &gt; ??????1.10??????????????????,???????????????????????????????
> &gt; val resTmpTab: Table = tabEnv.sqlQuery(
> &gt;&nbsp;&nbsp; """
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT
MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
> HH:mm:00'))
> &gt; time_str,COUNT(DISTINCT userkey) uv
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM
user_behavior&nbsp;&nbsp;&nbsp; GROUP BY
> DATE_FORMAT(ts, 'yyyy-MM-dd')&nbsp;&nbsp;&nbsp; """)
> &gt;
> &gt; val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> &gt;&nbsp;&nbsp;
> .filter(line=&amp;gt;line._1==true).map(line=&amp;gt;line._2)
> &gt;
> &gt; val res= tabEnv.fromDataStream(resTmpStream)
> &gt; tabEnv.sqlUpdate(
> &gt;&nbsp;&nbsp; s"""
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; INSERT INTO rt_totaluv
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT _1,MAX(_2)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM $res
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY _1
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
> &gt;
> &gt;
> &gt;
------------------&amp;nbsp;????????&amp;nbsp;------------------
> &gt; ??????:&amp;nbsp;"Jark Wu"<[email protected]&amp;gt;;
> &gt; ????????:&amp;nbsp;2020??6??17??(??????) ????1:55
> &gt;
??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
> &gt;
> &gt; ????:&amp;nbsp;Re: ??????FLINKSQL1.10????????????UV
> &gt;
> &gt;
> &gt;
> &gt; ?? Flink 1.11 ????????????????????
> &gt;
> &gt; CREATE TABLE mysql (
> &gt; &amp;nbsp;&amp;nbsp; time_str STRING,
> &gt; &amp;nbsp;&amp;nbsp; uv BIGINT,
> &gt; &amp;nbsp;&amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
> &gt; ) WITH (
> &gt; &amp;nbsp;&amp;nbsp; 'connector' = 'jdbc',
> &gt; &amp;nbsp;&amp;nbsp; 'url' =
'jdbc:mysql://localhost:3306/mydatabase',
> &gt; &amp;nbsp;&amp;nbsp; 'table-name' = 'myuv'
> &gt; );
> &gt;
> &gt; INSERT INTO mysql
> &gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')),
> COUNT(DISTINCT&amp;nbsp;
> &gt; user_id)
> &gt; FROM user_behavior;
> &gt;
> &gt; On Wed, 17 Jun 2020 at 13:49, x <[email protected]&amp;gt;
wrote:
> &gt;
> &gt; &amp;gt;
??????????????????????"??????"????????????????????????????????????????????UV??
> &gt; &amp;gt; sink??????????
> &gt; &amp;gt; tm uv
> &gt; &amp;gt; 2020/06/17 13:46:00 10000
> &gt; &amp;gt; 2020/06/17 13:47:00 20000
> &gt; &amp;gt; 2020/06/17 13:48:00 30000
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; group by ??????????????????????
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
>
------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
> &gt; &amp;gt; ??????:&amp;amp;nbsp;"Benchao
Li"<[email protected]
> &amp;amp;gt;;
> &gt; &amp;gt; ????????:&amp;amp;nbsp;2020??6??17??(??????)
????11:46
> &gt; &amp;gt;
??????:&amp;amp;nbsp;"user-zh"<[email protected]
> &amp;amp;gt;;
> &gt; &amp;gt;
> &gt; &amp;gt; ????:&amp;amp;nbsp;Re:
??????FLINKSQL1.10????????????UV
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; Hi??
> &gt; &amp;gt; ??????????????????????????????
> &gt; &amp;gt; 1. ??????????group by + mini batch
> &gt; &amp;gt; 2. window???? + fast emit
> &gt; &amp;gt;
> &gt; &amp;gt; ????#1??group
by????????????????????????????????????????????????DATE_FORMAT(rowtm,
> 'yyyy-MM-dd')??
> &gt; &amp;gt; ??????????????????????????????state
retention??????????????????????[1] ????????mini
> batch????????????
> &gt; &amp;gt; ??????[2] ????????
> &gt; &amp;gt;
> &gt; &amp;gt;
????#2????????????????????????tumble????????????????????????????????????????????????????
> &gt; &amp;gt; fast
>
emit????????????????????experimental??feature????????????????????????????????????????????????????????????
> &gt; &amp;gt; table.exec.emit.early-fire.enabled = true
> &gt; &amp;gt; table.exec.emit.early-fire.delay = 60 s
> &gt; &amp;gt;
> &gt; &amp;gt; [1]
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt;
>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> &gt; &amp;gt; [2]
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt;
>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> &gt; &amp;gt;
> &gt; &amp;gt; x <[email protected]&amp;amp;gt;
??2020??6??17?????? ????11:14??????
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
>
??????????????????????0??????????????UV??????????????????????????????????????UV??????????????????????
> &gt; &amp;gt; &amp;amp;gt; CREATE VIEW uv_per_10min AS
> &gt; &amp;gt; &amp;amp;gt; SELECT&amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;
> MAX(DATE_FORMAT(proctime&amp;amp;amp;nbsp;,
> &gt; 'yyyy-MM-dd
> &gt; &amp;gt; HH:mm:00'))&amp;amp;amp;nbsp;OVER w
> &gt; &amp;gt; &amp;amp;gt; AS time_str,&amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;
COUNT(DISTINCT user_id) OVER
> w AS uv
> &gt; &amp;gt; &amp;amp;gt; FROM user_behavior
> &gt; &amp;gt; &amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS
BETWEEN
> UNBOUNDED
> &gt; PRECEDING AND
> &gt; &amp;gt; &amp;amp;gt; CURRENT ROW);
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; ??????????????????????????
> &gt; &amp;gt; &amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm,
'yyyy-MM-dd')
> &gt; ??????????????????????????????????
> &gt; &amp;gt; &amp;amp;gt;
PS??1.10??????????DDL??????????CREATE VIEW??
> &gt; &amp;gt; &amp;amp;gt; ????
--
Best,
Benchao Li