你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 这个已经在1.11中修复了。
[1] https://issues.apache.org/jira/browse/FLINK-17942 x <[email protected]> 于2020年7月3日周五 下午4:34写道: > 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Jark Wu"<[email protected]>; > 发送时间: 2020年6月18日(星期四) 中午12:16 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 是的,我觉得这样子是能绕过的。 > > On Thu, 18 Jun 2020 at 10:34, x <[email protected]> wrote: > > > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > > val resTmpTab: Table = tabEnv.sqlQuery( > > """ > > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd > HH:mm:00')) > > time_str,COUNT(DISTINCT userkey) uv > > FROM user_behavior GROUP BY > DATE_FORMAT(ts, 'yyyy-MM-dd') """) > > > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > > > .filter(line=&gt;line._1==true).map(line=&gt;line._2) > > > > val res= tabEnv.fromDataStream(resTmpStream) > > tabEnv.sqlUpdate( > > s""" > > INSERT INTO rt_totaluv > > SELECT _1,MAX(_2) > > FROM $res > > GROUP BY _1 > > """) > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Jark Wu"<[email protected]&gt;; > > 发送时间:&nbsp;2020年6月17日(星期三) 中午1:55 > > 收件人:&nbsp;"user-zh"<[email protected]&gt;; > > > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > > > > > 在 Flink 1.11 中,你可以尝试这样: > > > > CREATE TABLE mysql ( > > &nbsp;&nbsp; time_str STRING, > > &nbsp;&nbsp; uv BIGINT, > > &nbsp;&nbsp; PRIMARY KEY (ts) NOT ENFORCED > > ) WITH ( > > &nbsp;&nbsp; 'connector' = 'jdbc', > > &nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > > &nbsp;&nbsp; 'table-name' = 'myuv' > > ); > > > > INSERT INTO mysql > > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), > COUNT(DISTINCT&nbsp; > > user_id) > > FROM user_behavior; > > > > On Wed, 17 Jun 2020 at 13:49, x <[email protected]&gt; wrote: > > > > &gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > > &gt; sink表这个样式 > > &gt; tm uv > > &gt; 2020/06/17 13:46:00 10000 > > &gt; 2020/06/17 13:47:00 20000 > > &gt; 2020/06/17 13:48:00 30000 > > &gt; > > &gt; > > &gt; group by 日期的话,分钟如何获取 > > &gt; > > &gt; > > &gt; > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > > &gt; 发件人:&amp;nbsp;"Benchao Li"<[email protected] > &amp;gt;; > > &gt; 发送时间:&amp;nbsp;2020年6月17日(星期三) 中午11:46 > > &gt; 收件人:&amp;nbsp;"user-zh"<[email protected] > &amp;gt;; > > &gt; > > &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > &gt; > > &gt; > > &gt; > > &gt; Hi, > > &gt; 我感觉这种场景可以有两种方式, > > &gt; 1. 可以直接用group by + mini batch > > &gt; 2. window聚合 + fast emit > > &gt; > > &gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, > 'yyyy-MM-dd')。 > > &gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini > batch的开启也需要 > > &gt; 用参数[2] 来打开。 > > &gt; > > &gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > > &gt; fast > emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > > &gt; table.exec.emit.early-fire.enabled = true > > &gt; table.exec.emit.early-fire.delay = 60 s > > &gt; > > &gt; [1] > > &gt; > > &gt; > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > &gt; [2] > > &gt; > > &gt; > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > &gt; > > &gt; x <[email protected]&amp;gt; 于2020年6月17日周三 上午11:14写道: > > &gt; > > &gt; &amp;gt; > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > &gt; &amp;gt; CREATE VIEW uv_per_10min AS > > &gt; &amp;gt; SELECT&amp;amp;nbsp; > > &gt; &amp;gt; &amp;amp;nbsp; > MAX(DATE_FORMAT(proctime&amp;amp;nbsp;, > > 'yyyy-MM-dd > > &gt; HH:mm:00'))&amp;amp;nbsp;OVER w > > &gt; &amp;gt; AS time_str,&amp;amp;nbsp; > > &gt; &amp;gt; &amp;amp;nbsp; COUNT(DISTINCT user_id) OVER > w AS uv > > &gt; &amp;gt; FROM user_behavior > > &gt; &amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN > UNBOUNDED > > PRECEDING AND > > &gt; &amp;gt; CURRENT ROW); > > &gt; &amp;gt; > > &gt; &amp;gt; > > &gt; &amp;gt; 想请教一下,应该如何处理? > > &gt; &amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') > > 这样可以吗,另外状态应该如何清理? > > &gt; &amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > &gt; &amp;gt; 多谢 -- Best, Benchao Li
