这个目前还不支持,但是可以基于TVF来实现,现在已经建了一个jira了:
https://issues.apache.org/jira/browse/FLINK-24002

Caizhi Weng <[email protected]> 于2021年9月22日周三 上午11:17写道:

> Hi!
>
> 据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。
>
> 不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为
> datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。
>
> EnvironmentSettings settings = EnvironmentSettings.newInstance().
> inStreamingMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(
> StreamExecutionEnvironment.getExecutionEnvironment(), settings);
> tEnv.executeSql(
> "CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH
> ( 'connector' = 'datagen' )");
> Table table = tEnv.sqlQuery("SELECT key, val FROM T");
> DataStream<Row> dataStream = tEnv.toDataStream(table);
> DataStream<Tuple2<Integer, Integer>> summedStream =
> dataStream
> .keyBy(row -> (int) row.getField(0))
> .countWindow(100)
> .apply(
> (WindowFunction<
> Row,
> Tuple2<Integer, Integer>,
> Integer,
> GlobalWindow>)
> (key, window, input, out) -> {
> int sum = 0;
> for (Row row : input) {
> Integer field = (Integer) row.getField(1);
> if (field != null) {
> sum += field;
> }
> }
> out.collect(Tuple2.of(key, sum));
> })
> .returns(
> new TupleTypeInfo<>(
> BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
> Table summedTable = tEnv.fromDataStream(summedStream);
> tEnv.registerTable("S", summedTable);
> tEnv.executeSql("SELECT f0, f1 FROM S").print();
>
> casel.chen <[email protected]> 于2021年9月17日周五 下午6:05写道:
>
> > 今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time
> > window,问一下官方是否打算sql支持count window呢?
> > 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!
>

回复