yarn per????????????rocksDB??????????localdir--v1.10.1
flinkLocal DB files directory XXX does not exist and cannot be created
??????????????????allowedLateness??????????????????????????????????????????????????????
?? .keyBy.window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.hours(1)).aggregate(new AggregateFunction,new ProcessWindowFunctionBloomFilter)
??????kafka broker ????????????????????????????
kafka0.10.1.1??flink1.10.1 ?? 2020-10-2421:52:44,053INFOorg.apache.flink.kafka.shaded.org.apache.kafka.clients.FetchSessionHandler-[ConsumerclientId=consumer-12,groupId=onlineTag]Errorsendingfetchrequest(sessionId=INVALID,epoch=INITIAL)tonode10:org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException. 2020-10-2421:53:13,254INFOorg.apache.flink.kafka.shaded.org.apache.kafka.clients.FetchSessionHandler-[ConsumerclientId=consumer-20,groupId=onlineTag]Errorsendingfetchrequest(sessionId=INVALID,epoch=INITIAL)tonode11:org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException.
?????? ????????????????????????????????????????
??KeyedProcessFunctionProcessWindowFunction. ---- ??: "user-zh"
??????????????????????????????????????
1224 ---- ??: "user-zh"
??????????????????????????????????????
1224 -- ??: "user-zh" <584680...@qq.com; :2020??10??15??(??) 3:47 ??:"user-zh"
????????????????????????????????????????
??v1.10.1 AggregateFunction+ProcessWindowFunction??ProcessWindowFunction??+ProcessWindowFunction??clear??23??59??00??override def clear(ctx: Context): Unit = { val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart) if(dt.equals("23:59:00")){ state.clear()??keyBykey
?????? ????savepoint
---- ??: "x" <35907...@qq.com; :2020??9??3??(??) 12:22 ??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html ????~ x <35907...@qq.com ??2020??9??3?? 11:30?? /flink/flink-1.10.1/bin/flink cancel -s hdfs://nameservice1/user/flink_1.10.1/flink-savepoints f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 Unrecognized option: -yid
?????? ????savepoint
??V1.10.1?? ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html ????~ x <35907...@qq.com ??2020??9??3?? 11:30?? /flink/flink-1.10.1/bin/flink cancel -s hdfs://nameservice1/user/flink_1.10.1/flink-savepoints f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 Unrecognized option: -yid
????savepoint
/flink/flink-1.10.1/bin/flink cancel -s hdfs://nameservice1/user/flink_1.10.1/flink-savepoints f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 Unrecognized option: -yid
?????? ProcessWindowFunction??????clear??????????????????-v1.10.1
??8?? .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) ---- ??: "x" <35907...@qq.com; :2020??8??27??(??) 2:00 ??:"user-zh@flink.apache.org"
?????? ProcessWindowFunction??????clear??????????????????-v1.10.1
10??1??.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))10 ---- ??: "user-zh"
??????????????????UV??????????????MapState??BloomFilter,??checkpoint????????????????????
UV??MapStateBloomFilter??,checkpoint??bloomMapState
ProcessWindowFunction??????clear??????????????????-v1.10.1
ProcessWindowFunction??clearenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?? .window(TumblingEventTimeWindows.of(Time.days(1))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ private var state: MapState[String,Boolean] = _ override def open override def process override def clear(ctx: Context): Unit = { state.clear() } }
回复: 求助:FLINKSQL1.10实时统计累计UV
您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的 void sqlUpdate(String stmt); --原始邮件-- 发件人:"seeksst"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm, gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, '-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; amp;gt; val totaluvTmp = gt; tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; --amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; --amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; amp;amp;gt; gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; SELECT gt; amp;gt; MAX(DATE_FORMAT(ts, '-MM-dd gt; amp;gt; amp;amp;gt; HH:mm:00')) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; amp;gt; amp;amp;gt; gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; FROM gt; amp;gt; user_behavioramp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; GROUP BY gt; amp;gt; amp;amp;gt; DATE_FORMAT(ts, gt; '-MM-dd')amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val gt; amp;gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; gt; .filter(line=amp;amp;amp;amp;gt;line._1==true).map(line=amp;amp;amp;amp;gt;line._2) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream) gt; amp;gt; amp;amp;g
?????? ??????FLINKSQL1.10????????????UV
??blinkval setttings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() ---- ??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time x <35907...@qq.comgt; ??2020??7??6?? 11:15?? gt; ??1.10.1??sinkwindow??count gt; distinct??window??count gt; distinct??windowgroupamp;nbsp;DATE_FORMAT(rowtm, gt; '-MM-dd') sql?? gt; val rt_totaluv_view : Table = tabEnv.sqlQuery( gt;nbsp;nbsp; """ gt;nbsp;nbsp;nbsp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) gt; time_str,COUNT(DISTINCT userkey) uv gt;nbsp;nbsp;nbsp;nbsp; FROM source gt;nbsp;nbsp;nbsp;nbsp; GROUP BY DATE_FORMAT(rowtm, '-MM-dd') gt;nbsp;nbsp;nbsp;nbsp; """) gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; gt; val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt;nbsp;nbsp; .filter( line =amp;gt; line._1 == true ).map( line =amp;gt; line._2 ) gt; gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) gt; gt; tabEnv.sqlUpdate( gt;nbsp;nbsp; s""" gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO mysql_totaluv gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2) gt;nbsp;nbsp;nbsp;nbsp; FROM $totaluvTabTmp gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1 gt;nbsp;nbsp;nbsp;nbsp; """) gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 gt; gt; x <35907...@qq.comamp;gt; ??2020??7??3?? 4:34?? gt; gt; amp;gt; checkpoint?? gt; amp;gt; gt; amp;gt; gt; tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; --amp;amp;nbsp;amp;amp;nbsp;-- gt; amp;gt; ??:amp;amp;nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; [2] gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; x <35907...@qq.com amp;amp;amp;amp;gt; gt; ??2020??6??17?? 11:14?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; gt; amp;gt; ??0??UV??UV?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; CREATE VIEW uv_per_10min AS gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; SELECTamp;amp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; amp;amp;amp;amp;amp;nbsp; gt; amp;gt; MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;amp;nbsp;, gt; amp;gt; amp;amp;gt; '-MM-dd gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; HH:mm:00'))amp;amp;amp;amp;amp;nbsp;OVER w gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; AS gt; time_str,amp;amp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; amp;amp;amp;amp;amp;nbsp; gt; COUNT(DISTINCT user_id) OVER gt; amp;gt; w AS uv gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; FROM user_behavior gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; WINDOW w AS (ORDER BY proctime gt; ROWS BETWEEN gt; amp;gt; UNBOUNDED gt; amp;gt; amp;amp;gt; PRECEDING AND gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; CURRENT ROW); gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; ?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; PARTITION BY gt; DATE_FORMAT(rowtm, '-MM-dd') gt; amp;gt; amp;amp;gt; ?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; PS??1.10??DDL??CREATE gt; VIEW?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;gt; gt; gt; gt; gt; -- gt; gt; Best, gt; Benchao Li -- Best, Benchao Li -- Best, Benchao Li
?????? ??????FLINKSQL1.10????????????UV
sorry,group agg. tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??. ---- ??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time x <35907...@qq.com ??2020??7??6?? 11:15?? ??1.10.1??sinkwindow??count distinct??window??count distinct??windowgroupnbsp;DATE_FORMAT(rowtm, '-MM-dd') sql?? val rt_totaluv_view : Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM source GROUP BY DATE_FORMAT(rowtm, '-MM-dd') """) tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) .filter( line =gt; line._1 == true ).map( line =gt; line._2 ) val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) tabEnv.sqlUpdate( s""" INSERT INTO mysql_totaluv SELECT _1,MAX(_2) FROM $totaluvTabTmp GROUP BY _1 """) --nbsp;nbsp;-- ??:nbsp;"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.comgt; ??2020??7??3?? 4:34?? gt; checkpoint?? gt; gt; tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html gt; amp;gt; amp;amp;gt; [2] gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; x <35907...@qq.comamp;amp;amp;gt; ??2020??6??17?? 11:14?? gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; ??0??UV??UV?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CREATE VIEW uv_per_10min AS gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; SELECTamp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;nbsp; gt; MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;nbsp;, gt; amp;gt; '-MM-dd gt; amp;gt; amp;amp;gt; HH:mm:00'))amp;amp;amp;amp;nbsp;OVER w gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; AS time_str,amp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;nbsp; COUNT(DISTINCT user_id) OVER gt; w AS uv gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; FROM user_behavior gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN gt; UNBOUNDED gt; amp;gt; PRECEDING AND gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CURRENT ROW); gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; ?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') gt; amp;gt; ?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PS??1.10??DDL??CREATE VIEW?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; -- Best, Benchao Li -- Best, Benchao Li
?????? ??????FLINKSQL1.10????????????UV
??1.10.1??sinkwindow??count distinct??window??count distinct??windowgroupDATE_FORMAT(rowtm, '-MM-dd') sql?? val rt_totaluv_view : Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM source GROUP BY DATE_FORMAT(rowtm, '-MM-dd') """) tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) .filter( line = line._1 == true ).map( line = line._2 ) val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) tabEnv.sqlUpdate( s""" INSERT INTO mysql_totaluv SELECT _1,MAX(_2) FROM $totaluvTabTmp GROUP BY _1 """) ---- ??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.com ??2020??7??3?? 4:34?? checkpoint?? tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key --nbsp;nbsp;-- ??:nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html gt; amp;gt; [2] gt; amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html gt; amp;gt; gt; amp;gt; x <35907...@qq.comamp;amp;gt; ??2020??6??17?? 11:14?? gt; amp;gt; gt; amp;gt; amp;amp;gt; ??0??UV??UV?? gt; amp;gt; amp;amp;gt; CREATE VIEW uv_per_10min AS gt; amp;gt; amp;amp;gt; SELECTamp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; MAX(DATE_FORMAT(proctimeamp;amp;amp;nbsp;, gt; '-MM-dd gt; amp;gt; HH:mm:00'))amp;amp;amp;nbsp;OVER w gt; amp;gt; amp;amp;gt; AS time_str,amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv gt; amp;gt; amp;amp;gt; FROM user_behavior gt; amp;gt; amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED gt; PRECEDING AND gt; amp;gt; amp;amp;gt; CURRENT ROW); gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; ?? gt; amp;gt; amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') gt; ?? gt; amp;gt; amp;amp;gt; PS??1.10??DDL??CREATE VIEW?? gt; amp;gt; amp;amp;gt; -- Best, Benchao Li
?????? ??????FLINKSQL1.10????????????UV
checkpoint?? tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key ---- ??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html gt; [2] gt; gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html gt; gt; x <35907...@qq.comamp;gt; ??2020??6??17?? 11:14?? gt; gt; amp;gt; ??0??UV??UV?? gt; amp;gt; CREATE VIEW uv_per_10min AS gt; amp;gt; SELECTamp;amp;nbsp; gt; amp;gt; amp;amp;nbsp; MAX(DATE_FORMAT(proctimeamp;amp;nbsp;, '-MM-dd gt; HH:mm:00'))amp;amp;nbsp;OVER w gt; amp;gt; AS time_str,amp;amp;nbsp; gt; amp;gt; amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv gt; amp;gt; FROM user_behavior gt; amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND gt; amp;gt; CURRENT ROW); gt; amp;gt; gt; amp;gt; gt; amp;gt; ?? gt; amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? gt; amp;gt; PS??1.10??DDL??CREATE VIEW?? gt; amp;gt;
?????? ??????FLINKSQL1.10????????????UV
??1.10??,??? val resTmpTab: Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""") val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) .filter(line=line._1==true).map(line=line._2) val res= tabEnv.fromDataStream(resTmpStream) tabEnv.sqlUpdate( s""" INSERT INTO rt_totaluv SELECT _1,MAX(_2) FROM $res GROUP BY _1 """) ---- ??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <35907...@qq.comgt; ??2020??6??17?? 11:14?? gt; ??0??UV??UV?? gt; CREATE VIEW uv_per_10min AS gt; SELECTamp;nbsp; gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd HH:mm:00'))amp;nbsp;OVER w gt; AS time_str,amp;nbsp; gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv gt; FROM user_behavior gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND gt; CURRENT ROW); gt; gt; gt; ?? gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? gt; PS??1.10??DDL??CREATE VIEW?? gt;
?????? ??????FLINKSQL1.10????????????UV
---- ??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <35907...@qq.comgt; ??2020??6??17?? 11:14?? gt; ??0??UV??UV?? gt; CREATE VIEW uv_per_10min AS gt; SELECTamp;nbsp; gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd HH:mm:00'))amp;nbsp;OVER w gt; AS time_str,amp;nbsp; gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv gt; FROM user_behavior gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND gt; CURRENT ROW); gt; gt; gt; ?? gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? gt; PS??1.10??DDL??CREATE VIEW?? gt;
?????? ??????FLINKSQL1.10????????????UV
??"??"UV?? sink?? tm uv 2020/06/17 13:46:00 1 2020/06/17 13:47:00 2 2020/06/17 13:48:00 3 group by ?? ---- ??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <35907...@qq.com ??2020??6??17?? 11:14?? ??0??UV??UV?? CREATE VIEW uv_per_10min AS SELECTnbsp; nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd HH:mm:00'))nbsp;OVER w AS time_str,nbsp; nbsp; COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW); ?? PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? PS??1.10??DDL??CREATE VIEW??
??????FLINKSQL1.10????????????UV
??0??UV??UV?? CREATE VIEW uv_per_10min AS SELECT MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW); ?? PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? PS??1.10??DDL??CREATE VIEW??