是的,我觉得这样子是能绕过的。 On Thu, 18 Jun 2020 at 10:34, x <[email protected]> wrote:
> 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > val resTmpTab: Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """) > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > .filter(line=>line._1==true).map(line=>line._2) > > val res= tabEnv.fromDataStream(resTmpStream) > tabEnv.sqlUpdate( > s""" > INSERT INTO rt_totaluv > SELECT _1,MAX(_2) > FROM $res > GROUP BY _1 > """) > > > ------------------ 原始邮件 ------------------ > 发件人: "Jark Wu"<[email protected]>; > 发送时间: 2020年6月17日(星期三) 中午1:55 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 在 Flink 1.11 中,你可以尝试这样: > > CREATE TABLE mysql ( > time_str STRING, > uv BIGINT, > PRIMARY KEY (ts) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > 'table-name' = 'myuv' > ); > > INSERT INTO mysql > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT > user_id) > FROM user_behavior; > > On Wed, 17 Jun 2020 at 13:49, x <[email protected]> wrote: > > > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > > sink表这个样式 > > tm uv > > 2020/06/17 13:46:00 10000 > > 2020/06/17 13:47:00 20000 > > 2020/06/17 13:48:00 30000 > > > > > > group by 日期的话,分钟如何获取 > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Benchao Li"<[email protected]&gt;; > > 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46 > > 收件人:&nbsp;"user-zh"<[email protected]&gt;; > > > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > > > > > Hi, > > 我感觉这种场景可以有两种方式, > > 1. 可以直接用group by + mini batch > > 2. window聚合 + fast emit > > > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > > 用参数[2] 来打开。 > > > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > > table.exec.emit.early-fire.enabled = true > > table.exec.emit.early-fire.delay = 60 s > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > > x <[email protected]&gt; 于2020年6月17日周三 上午11:14写道: > > > > &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > &gt; CREATE VIEW uv_per_10min AS > > &gt; SELECT&amp;nbsp; > > &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, > 'yyyy-MM-dd > > HH:mm:00'))&amp;nbsp;OVER w > > &gt; AS time_str,&amp;nbsp; > > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > > &gt; FROM user_behavior > > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED > PRECEDING AND > > &gt; CURRENT ROW); > > &gt; > > &gt; > > &gt; 想请教一下,应该如何处理? > > &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') > 这样可以吗,另外状态应该如何清理? > > &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > &gt; 多谢
