大家好:
最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式
create stream input table raw_log (
country STRING,
domain STRING,
flux LONG,
request LONG,
rowtime AS ROWTIME(request, "2 SECOND")
) USING kafka (
kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
startingOffsets = earliest, subscribe = "input"
) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
topic = "output"
) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log
as
select
TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
country,
domain,
sum(flux) as flux
from
raw_log
group by
TUMBLE(rowtime, INTERVAL '2' SECOND),
country,
domain; insert into top_n_result
SELECT
*
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY wStart
ORDER BY
flux desc
) AS row_num
FROM
window_log
)
WHERE
row_num <= 10;
就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的
在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了
一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大
后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在
AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现?
能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?