yarn per????????????rocksDB??????????localdir--v1.10.1

2021-01-19 文章 x
flinkLocal DB files directory XXX 
does not exist and cannot be created

??????????????????allowedLateness??????????????????????????????????????????????????????

2020-11-29 文章 x
??
.keyBy.window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.hours(1)).aggregate(new
 AggregateFunction,new ProcessWindowFunctionBloomFilter)

??????kafka broker ????????????????????????????

2020-10-26 文章 x
kafka0.10.1.1??flink1.10.1
??
2020-10-2421:52:44,053INFOorg.apache.flink.kafka.shaded.org.apache.kafka.clients.FetchSessionHandler-[ConsumerclientId=consumer-12,groupId=onlineTag]Errorsendingfetchrequest(sessionId=INVALID,epoch=INITIAL)tonode10:org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException.
2020-10-2421:53:13,254INFOorg.apache.flink.kafka.shaded.org.apache.kafka.clients.FetchSessionHandler-[ConsumerclientId=consumer-20,groupId=onlineTag]Errorsendingfetchrequest(sessionId=INVALID,epoch=INITIAL)tonode11:org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException.

?????? ????????????????????????????????????????

2020-10-19 文章 x
??KeyedProcessFunctionProcessWindowFunction.




----
??: 
   "user-zh"



??????????????????????????????????????

2020-10-19 文章 x
1224




----
??: 
   "user-zh"



??????????????????????????????????????

2020-10-19 文章 x
1224




--
??: 
   "user-zh"

<584680...@qq.com;
:2020??10??15??(??) 3:47
??:"user-zh"

????????????????????????????????????????

2020-10-19 文章 x
??v1.10.1
AggregateFunction+ProcessWindowFunction??ProcessWindowFunction??+ProcessWindowFunction??clear??23??59??00??override
 def clear(ctx: Context): Unit = {
  val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
  if(dt.equals("23:59:00")){

state.clear()??keyBykey

?????? ????savepoint

2020-09-04 文章 x





----
??: 
   "x"  
  <35907...@qq.com;
:2020??9??3??(??) 12:22
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

????~

x <35907...@qq.com ??2020??9??3?? 11:30??

 /flink/flink-1.10.1/bin/flink cancel -s
 hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
 f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
 Unrecognized option: -yid

?????? ????savepoint

2020-09-02 文章 x
??V1.10.1??


----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

????~

x <35907...@qq.com ??2020??9??3?? 11:30??

 /flink/flink-1.10.1/bin/flink cancel -s
 hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
 f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
 Unrecognized option: -yid

????savepoint

2020-09-02 文章 x
/flink/flink-1.10.1/bin/flink cancel -s 
hdfs://nameservice1/user/flink_1.10.1/flink-savepoints 
f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
Unrecognized option: -yid

?????? ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-27 文章 x
??8??
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))




----
??: 
   "x"  
  <35907...@qq.com;
:2020??8??27??(??) 2:00
??:"user-zh@flink.apache.org"

?????? ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-27 文章 x
10??1??.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))10


----
??: 
   "user-zh"



??????????????????UV??????????????MapState??BloomFilter,??checkpoint????????????????????

2020-08-26 文章 x
UV??MapStateBloomFilter??,checkpoint??bloomMapState

ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-25 文章 x
ProcessWindowFunction??clearenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)??
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
private var state: MapState[String,Boolean] = _
override def open
override def process
override def clear(ctx: Context): Unit = {
state.clear()
}
}

回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-07 文章 x
您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的
void sqlUpdate(String stmt);




--原始邮件--
发件人:"seeksst"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT gt; amp;gt; MAX(DATE_FORMAT(ts, '-MM-dd gt; amp;gt; amp;amp;gt; 
HH:mm:00')) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; time_str,COUNT(DISTINCT 
userkey) uv gt; amp;gt; amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 FROM gt; amp;gt; 
user_behavioramp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; GROUP BY gt; 
amp;gt; amp;amp;gt; DATE_FORMAT(ts, gt; 
'-MM-dd')amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val 
gt; amp;gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; gt; 
amp;gt; amp;amp;gt; gt; 
.filter(line=amp;amp;amp;amp;gt;line._1==true).map(line=amp;amp;amp;amp;gt;line._2)
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; 
amp;amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream) gt; amp;gt; 
amp;amp;g

?????? ??????FLINKSQL1.10????????????UV

2020-07-06 文章 x
??blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()





----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

 x <35907...@qq.comgt; ??2020??7??6?? 11:15??

 gt; ??1.10.1??sinkwindow??count
 gt; distinct??window??count
 gt;
 
distinct??windowgroupamp;nbsp;DATE_FORMAT(rowtm,
 gt; '-MM-dd') sql??
 gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
 gt;nbsp;nbsp; """
 gt;nbsp;nbsp;nbsp;nbsp; SELECT 
MAX(DATE_FORMAT(rowtm, '-MM-dd
 HH:mm:00'))
 gt; time_str,COUNT(DISTINCT userkey) uv
 gt;nbsp;nbsp;nbsp;nbsp; FROM source
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY 
DATE_FORMAT(rowtm, '-MM-dd')
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
 gt;
 gt; val totaluvTmp =
 tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 gt;nbsp;nbsp; .filter( line =amp;gt; line._1 == true 
).map( line
 =amp;gt; line._2 )
 gt;
 gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
 gt;
 gt; tabEnv.sqlUpdate(
 gt;nbsp;nbsp; s"""
 gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO mysql_totaluv
 gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2)
 gt;nbsp;nbsp;nbsp;nbsp; FROM $totaluvTabTmp
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Benchao 
Li"https://issues.apache.org/jira/browse/FLINK-17942
 gt;
 gt; x <35907...@qq.comamp;gt; ??2020??7??3?? 4:34??
 gt;
 gt; amp;gt; 
checkpoint??
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 
--amp;amp;nbsp;amp;amp;nbsp;--
 gt; amp;gt; ??:amp;amp;nbsp;"Jark 
Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; x 
<35907...@qq.com
 amp;amp;amp;amp;gt;
 gt; ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CREATE
 VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 SELECTamp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; 
MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;amp;nbsp;,
 gt; amp;gt; amp;amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 HH:mm:00'))amp;amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; AS
 gt; time_str,amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; COUNT(DISTINCT user_id) OVER
 gt; amp;gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; FROM
 user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; WINDOW w
 AS (ORDER BY proctime
 gt; ROWS BETWEEN
 gt; amp;gt; UNBOUNDED
 gt; amp;gt; amp;amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CURRENT
 ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; PARTITION
 BY
 gt; DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 PS??1.10??DDL??CREATE
 gt; VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; 
 gt;
 gt;
 gt;
 gt; --
 gt;
 gt; Best,
 gt; Benchao Li



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

?????? ??????FLINKSQL1.10????????????UV

2020-07-05 文章 x
sorry,group agg.
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??.


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <35907...@qq.com ??2020??7??6?? 11:15??

 ??1.10.1??sinkwindow??count
 distinct??window??count
 
distinct??windowgroupnbsp;DATE_FORMAT(rowtm,
 '-MM-dd') sql??
 val rt_totaluv_view : Table = tabEnv.sqlQuery(
 """
 SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd 
HH:mm:00'))
 time_str,COUNT(DISTINCT userkey) uv
 FROM source
 GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
 """)
 tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

 val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 .filter( line =gt; line._1 == true ).map( line =gt; 
line._2 )

 val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

 tabEnv.sqlUpdate(
 s"""
 INSERT INTO mysql_totaluv
 SELECT _1,MAX(_2)
 FROM $totaluvTabTmp
 GROUP BY _1
 """)
 --nbsp;nbsp;--
 ??:nbsp;"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

 x <35907...@qq.comgt; ??2020??7??3?? 4:34??

 gt; 
checkpoint??
 gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; x 
<35907...@qq.comamp;amp;amp;gt;
 ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CREATE VIEW 
uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
SELECTamp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;nbsp;
 gt; MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;nbsp;,
 gt; amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; 
HH:mm:00'))amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; AS
 time_str,amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;nbsp;
 COUNT(DISTINCT user_id) OVER
 gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; FROM 
user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; WINDOW w AS 
(ORDER BY proctime
 ROWS BETWEEN
 gt; UNBOUNDED
 gt; amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CURRENT ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PARTITION BY
 DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
PS??1.10??DDL??CREATE
 VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

?????? ??????FLINKSQL1.10????????????UV

2020-07-05 文章 x
??1.10.1??sinkwindow??count 
distinct??window??count 
distinct??windowgroupDATE_FORMAT(rowtm,
 '-MM-dd') sql??
val rt_totaluv_view : Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) 
time_str,COUNT(DISTINCT userkey) uv
FROM source
GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
""")
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
  .filter( line = line._1 == true ).map( line = line._2 )

val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

tabEnv.sqlUpdate(
  s"""
INSERT INTO mysql_totaluv
SELECT _1,MAX(_2)
FROM $totaluvTabTmp
GROUP BY _1
""")
----
??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

x <35907...@qq.com ??2020??7??3?? 4:34??

 
checkpoint??

 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key




 --nbsp;nbsp;--
 ??:nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; [2]
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt;
 gt; amp;gt; x <35907...@qq.comamp;amp;gt; 
??2020??6??17?? 11:14??
 gt; amp;gt;
 gt; amp;gt; amp;amp;gt;
 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; CREATE VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; SELECTamp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp;
 MAX(DATE_FORMAT(proctimeamp;amp;amp;nbsp;,
 gt; '-MM-dd
 gt; amp;gt; HH:mm:00'))amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; AS time_str,amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; 
COUNT(DISTINCT user_id) OVER
 w AS uv
 gt; amp;gt; amp;amp;gt; FROM user_behavior
 gt; amp;gt; amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS 
BETWEEN
 UNBOUNDED
 gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; CURRENT ROW);
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, 
'-MM-dd')
 gt; ??
 gt; amp;gt; amp;amp;gt; 
PS??1.10??DDL??CREATE VIEW??
 gt; amp;gt; amp;amp;gt; 



-- 

Best,
Benchao Li

?????? ??????FLINKSQL1.10????????????UV

2020-07-03 文章 x
checkpoint??
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key




----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; [2]
 gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt;
 gt; x <35907...@qq.comamp;gt; ??2020??6??17?? 11:14??
 gt;
 gt; amp;gt; 
??0??UV??UV??
 gt; amp;gt; CREATE VIEW uv_per_10min AS
 gt; amp;gt; SELECTamp;amp;nbsp;
 gt; amp;gt; amp;amp;nbsp; 
MAX(DATE_FORMAT(proctimeamp;amp;nbsp;,
 '-MM-dd
 gt; HH:mm:00'))amp;amp;nbsp;OVER w
 gt; amp;gt; AS time_str,amp;amp;nbsp;
 gt; amp;gt; amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS 
uv
 gt; amp;gt; FROM user_behavior
 gt; amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
 PRECEDING AND
 gt; amp;gt; CURRENT ROW);
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt; ??
 gt; amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
 ??
 gt; amp;gt; PS??1.10??DDL??CREATE VIEW??
 gt; amp;gt; 

?????? ??????FLINKSQL1.10????????????UV

2020-06-17 文章 x
??1.10??,???
val resTmpTab: Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT 
userkey) uv
FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")

val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
  .filter(line=line._1==true).map(line=line._2)

val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
  s"""
INSERT INTO rt_totaluv
SELECT _1,MAX(_2)
FROM $res
GROUP BY _1
""")


----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 [2]

 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

 x <35907...@qq.comgt; ??2020??6??17?? 11:14??

 gt; 
??0??UV??UV??
 gt; CREATE VIEW uv_per_10min AS
 gt; SELECTamp;nbsp;
 gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd
 HH:mm:00'))amp;nbsp;OVER w
 gt; AS time_str,amp;nbsp;
 gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 gt; FROM user_behavior
 gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
 gt; CURRENT ROW);
 gt;
 gt;
 gt; ??
 gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 gt; PS??1.10??DDL??CREATE VIEW??
 gt; 

?????? ??????FLINKSQL1.10????????????UV

2020-06-17 文章 x





----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 [2]

 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

 x <35907...@qq.comgt; ??2020??6??17?? 11:14??

 gt; 
??0??UV??UV??
 gt; CREATE VIEW uv_per_10min AS
 gt; SELECTamp;nbsp;
 gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd
 HH:mm:00'))amp;nbsp;OVER w
 gt; AS time_str,amp;nbsp;
 gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 gt; FROM user_behavior
 gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
 gt; CURRENT ROW);
 gt;
 gt;
 gt; ??
 gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 gt; PS??1.10??DDL??CREATE VIEW??
 gt; 

?????? ??????FLINKSQL1.10????????????UV

2020-06-16 文章 x
??"??"UV??
sink??
tm uv
2020/06/17 13:46:00 1
2020/06/17 13:47:00 2
2020/06/17 13:48:00 3


group by ??


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com ??2020??6??17?? 11:14??

 
??0??UV??UV??
 CREATE VIEW uv_per_10min AS
 SELECTnbsp;
 nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd 
HH:mm:00'))nbsp;OVER w
 AS time_str,nbsp;
 nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 FROM user_behavior
 WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
 CURRENT ROW);


 ??
 PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 PS??1.10??DDL??CREATE VIEW??
 

??????FLINKSQL1.10????????????UV

2020-06-16 文章 x
??0??UV??UV??
CREATE VIEW uv_per_10min AS
SELECT
 MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w AS 
time_str,
 COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW);


??
PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ??
PS??1.10??DDL??CREATE VIEW??