回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-18 文章 Yuan,Youjun
不好意思,之前没看到这个问题。
Darwin-amd64就是mac上的可执行文件格式。信任他,直接可以执行的。

-邮件原件-
发件人: 陈帅  
发送时间: Saturday, December 7, 2019 10:48 PM
收件人: user-zh@flink.apache.org
主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的?

Yuan,Youjun  于2019年12月7日周六 下午8:32写道:

> 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。
>
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Saturday, December 7, 2019 11:36 AM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
> 虽然没有完全解决我的问题,但还是要谢谢你。
>
> Yuan,Youjun  于2019年12月5日周四 上午10:41写道:
>
> > 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> > 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> > INSERT INTO mysink
> > SELECT
> >ts, userid,
> >COUNT(userid)
> >OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN 
> > INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
> >
> > 以如下输入为例:
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> > 产出如下结果:
> > {"cnt":1,"ts":157554732,"userid":"user1"}
> > {"cnt":2,"ts":157554798,"userid":"user1"}
> > {"cnt":3,"ts":157554810,"userid":"user1"}
> > {"cnt":4,"ts":157554906,"userid":"user1"}
> > {"cnt":4,"ts":157554960,"userid":"user1"}
> > {"cnt":4,"ts":157554990,"userid":"user1"}
> >
> > 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> > 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> > {
> > "sources": [{
> > "schema": {
> > "format": "CSV",
> > "fields": [{
> > "name": "ts",
> > "type": "SQL_TIMESTAMP"
> > },
> > {
> > "name": "userid",
> > "type": "STRING"
> > }]
> > },
> > "watermark": 0,
> > "name": "mysrc",
> > "eventTime": "ts",
> > "type": "COLLECTION",
> > "attr": {
> > "input":[
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> >   ]
> >   }
> > }],
> > "sink": {
> > "schema": {
> > "format": "JSON"
> > },
> > "name": "mysink",
> > "type": "STDOUT"
> > },
> > "name": "demojob",
> > "timeType": "EVENTTIME",
> > "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid) 
> > OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30'
> > MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> > }
> >
> >
> > 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把tim
> > eT
> > ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
> >
> > 袁尤军
> >
> > -邮件原件-
> > 发件人: 陈帅 
> > 发送时间: Wednesday, December 4, 2019 11:40 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
> >
> > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> > 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4),
> > 12:41 (5), 12:46 (4), 13:16 (0)
> > 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
> >
> > 如果用sliding
> > window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> > over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> > api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
> >
> > 所以想问一下:
> > 1. 针对这种case有没有标准做法?sql支持吗?
> > 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
> >
> > 谢谢!
> > 陈帅
> >
>


Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-07 文章 陈帅
请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的?

Yuan,Youjun  于2019年12月7日周六 下午8:32写道:

> 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。
>
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Saturday, December 7, 2019 11:36 AM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
> 虽然没有完全解决我的问题,但还是要谢谢你。
>
> Yuan,Youjun  于2019年12月5日周四 上午10:41写道:
>
> > 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> > 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> > INSERT INTO mysink
> > SELECT
> >ts, userid,
> >COUNT(userid)
> >OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN
> > INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
> >
> > 以如下输入为例:
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> > 产出如下结果:
> > {"cnt":1,"ts":157554732,"userid":"user1"}
> > {"cnt":2,"ts":157554798,"userid":"user1"}
> > {"cnt":3,"ts":157554810,"userid":"user1"}
> > {"cnt":4,"ts":157554906,"userid":"user1"}
> > {"cnt":4,"ts":157554960,"userid":"user1"}
> > {"cnt":4,"ts":157554990,"userid":"user1"}
> >
> > 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> > 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> > {
> > "sources": [{
> > "schema": {
> > "format": "CSV",
> > "fields": [{
> > "name": "ts",
> > "type": "SQL_TIMESTAMP"
> > },
> > {
> > "name": "userid",
> > "type": "STRING"
> > }]
> > },
> > "watermark": 0,
> > "name": "mysrc",
> > "eventTime": "ts",
> > "type": "COLLECTION",
> > "attr": {
> > "input":[
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> >     "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> >   ]
> >   }
> > }],
> > "sink": {
> > "schema": {
> > "format": "JSON"
> > },
> > "name": "mysink",
> > "type": "STDOUT"
> > },
> > "name": "demojob",
> > "timeType": "EVENTTIME",
> > "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid)
> > OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30'
> > MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> > }
> >
> >
> > 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeT
> > ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
> >
> > 袁尤军
> >
> > -邮件原件-
> > 发件人: 陈帅 
> > 发送时间: Wednesday, December 4, 2019 11:40 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
> >
> > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> > 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4),
> > 12:41 (5), 12:46 (4), 13:16 (0)
> > 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
> >
> > 如果用sliding
> > window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> > over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> > api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
> >
> > 所以想问一下:
> > 1. 针对这种case有没有标准做法?sql支持吗?
> > 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
> >
> > 谢谢!
> > 陈帅
> >
>


回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-07 文章 Yuan,Youjun
是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。


-邮件原件-
发件人: 陈帅  
发送时间: Saturday, December 7, 2019 11:36 AM
收件人: user-zh@flink.apache.org
主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
虽然没有完全解决我的问题,但还是要谢谢你。

Yuan,Youjun  于2019年12月5日周四 上午10:41写道:

> 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> INSERT INTO mysink
> SELECT
>ts, userid,
>COUNT(userid)
>OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN 
> INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
>
> 以如下输入为例:
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1",
> "2019-12-05 12:40:00,user1",
> "2019-12-05 12:45:00,user1"
> 产出如下结果:
> {"cnt":1,"ts":157554732,"userid":"user1"}
> {"cnt":2,"ts":157554798,"userid":"user1"}
> {"cnt":3,"ts":157554810,"userid":"user1"}
> {"cnt":4,"ts":157554906,"userid":"user1"}
> {"cnt":4,"ts":157554960,"userid":"user1"}
> {"cnt":4,"ts":157554990,"userid":"user1"}
>
> 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> {
> "sources": [{
> "schema": {
> "format": "CSV",
> "fields": [{
> "name": "ts",
> "type": "SQL_TIMESTAMP"
> },
> {
> "name": "userid",
> "type": "STRING"
> }]
> },
> "watermark": 0,
> "name": "mysrc",
> "eventTime": "ts",
> "type": "COLLECTION",
> "attr": {
> "input":[
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1",
> "2019-12-05 12:40:00,user1",
>     "2019-12-05 12:45:00,user1"
>   ]
>   }
> }],
> "sink": {
> "schema": {
> "format": "JSON"
> },
> "name": "mysink",
> "type": "STDOUT"
> },
> "name": "demojob",
> "timeType": "EVENTTIME",
> "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid)  
> OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' 
> MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> }
>
>
> 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeT
> ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
>
> 袁尤军
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Wednesday, December 4, 2019 11:40 PM
> 收件人: user-zh@flink.apache.org
> 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 
> 12:41 (5), 12:46 (4), 13:16 (0)
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
>
> 如果用sliding 
> window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
>
> 所以想问一下:
> 1. 针对这种case有没有标准做法?sql支持吗?
> 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
>
> 谢谢!
> 陈帅
>


Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-06 文章 陈帅
你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
虽然没有完全解决我的问题,但还是要谢谢你。

Yuan,Youjun  于2019年12月5日周四 上午10:41写道:

> 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> INSERT INTO mysink
> SELECT
>ts, userid,
>COUNT(userid)
>OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL
> '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt
> FROM mysrc
>
> 以如下输入为例:
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1",
> "2019-12-05 12:40:00,user1",
> "2019-12-05 12:45:00,user1"
> 产出如下结果:
> {"cnt":1,"ts":157554732,"userid":"user1"}
> {"cnt":2,"ts":157554798,"userid":"user1"}
> {"cnt":3,"ts":157554810,"userid":"user1"}
> {"cnt":4,"ts":157554906,"userid":"user1"}
> {"cnt":4,"ts":157554960,"userid":"user1"}
> {"cnt":4,"ts":157554990,"userid":"user1"}
>
> 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> {
> "sources": [{
> "schema": {
> "format": "CSV",
> "fields": [{
> "name": "ts",
> "type": "SQL_TIMESTAMP"
> },
> {
> "name": "userid",
> "type": "STRING"
> }]
> },
> "watermark": 0,
> "name": "mysrc",
> "eventTime": "ts",
> "type": "COLLECTION",
> "attr": {
> "input":[
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1",
> "2019-12-05 12:40:00,user1",
>             "2019-12-05 12:45:00,user1"
>   ]
>   }
> }],
> "sink": {
> "schema": {
> "format": "JSON"
> },
> "name": "mysink",
> "type": "STDOUT"
> },
> "name": "demojob",
> "timeType": "EVENTTIME",
> "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid)  OVER
> (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' MINUTE
> PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> }
>
>
> 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeType从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
>
> 袁尤军
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Wednesday, December 4, 2019 11:40 PM
> 收件人: user-zh@flink.apache.org
> 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41
> (5), 12:46 (4), 13:16 (0)
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
>
> 如果用sliding window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
>
> 所以想问一下:
> 1. 针对这种case有没有标准做法?sql支持吗?
> 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
>
> 谢谢!
> 陈帅
>


回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-04 文章 Yuan,Youjun
可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
INSERT INTO mysink 
SELECT 
   ts, userid,  
   COUNT(userid)  
   OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' 
MINUTE PRECEDING AND CURRENT ROW) AS cnt 
FROM mysrc

以如下输入为例:
"2019-12-05 12:02:00,user1",
"2019-12-05 12:13:00,user1",
"2019-12-05 12:15:00,user1",
"2019-12-05 12:31:00,user1",
"2019-12-05 12:40:00,user1",
"2019-12-05 12:45:00,user1"
产出如下结果:
{"cnt":1,"ts":157554732,"userid":"user1"}
{"cnt":2,"ts":157554798,"userid":"user1"}
{"cnt":3,"ts":157554810,"userid":"user1"}
{"cnt":4,"ts":157554906,"userid":"user1"}
{"cnt":4,"ts":157554960,"userid":"user1"}
{"cnt":4,"ts":157554990,"userid":"user1"}

为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/ 
的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
{
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "userid",
"type": "STRING"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input":[
"2019-12-05 12:02:00,user1",
"2019-12-05 12:13:00,user1",
"2019-12-05 12:15:00,user1",
"2019-12-05 12:31:00,user1",
"2019-12-05 12:40:00,user1",
"2019-12-05 12:45:00,user1"
  ]
  }
}],
"sink": {
        "schema": {
    "format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid)  OVER 
(PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' MINUTE 
PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
}

当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeType从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。

袁尤军

-邮件原件-
发件人: 陈帅  
发送时间: Wednesday, December 4, 2019 11:40 PM
收件人: user-zh@flink.apache.org
主题: 如果用flink sql持续查询过去30分钟登录网站的人数?

例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 
12:46 (4), 13:16 (0)
即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。

如果用sliding window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.

所以想问一下:
1. 针对这种case有没有标准做法?sql支持吗?
2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?

谢谢!
陈帅


如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-04 文章 陈帅
例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41
(5), 12:46 (4), 13:16 (0)
即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。

如果用sliding window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.

所以想问一下:
1. 针对这种case有没有标准做法?sql支持吗?
2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?

谢谢!
陈帅