大家好:

最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式

create stream input table raw_log (
  country STRING,
  domain STRING,
  flux LONG,
  request LONG,
  rowtime AS ROWTIME(request, "2 SECOND")
) USING kafka (
  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
  startingOffsets = earliest, subscribe = "input"
) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
  topic = "output"
) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log 
as
select
  TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
  country,
  domain,
  sum(flux) as flux
from
  raw_log
group by
  TUMBLE(rowtime, INTERVAL '2' SECOND),
  country,
  domain; insert into top_n_result
SELECT
  *
FROM
  (
    SELECT
      *,
      ROW_NUMBER() OVER (
        PARTITION BY wStart
        ORDER BY
          flux desc
      ) AS row_num
    FROM
      window_log
  )
WHERE
  row_num <= 10;

        就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 
在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了

一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 
后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在

AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 
能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?

回复