Hi, fengqi. 这看起来像是select语句中,不能直接使用非来源于window agg的proctime或者event函数。目前不确定这是不是预期行为,方便的话可以在社区jira[1]上提一个bug看看。 快速绕过的话,可以试试下面的代码:
DataStream<Persion> flintstones = env.fromCollection(list); // Table select = table.select($("name"), $("age"), $("addtime").proctime()); Table table = tEnv.fromDataStream( flintstones, Schema.newBuilder() .column("name", "string") .column("age", "int") .columnByExpression("addtime", "proctime()") .build()); Table select1 = // select.xxx table.window(Tumble.over(lit(10).second()).on($("addtime")).as("w")) .groupBy($("name"), $("w")) .select($("name"), $("age").sum()); select1.execute().print(); [1] https://issues.apache.org/jira/projects/FLINK/issues -- Best! Xuyang At 2024-03-08 09:28:10, "ha.fen...@aisino.com" <ha.fen...@aisino.com> wrote: >public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > List<Persion> list = new ArrayList<>(); > list.add(new Persion("Fred",35)); > list.add(new Persion("wilma",35)); > list.add(new Persion("Pebbles",2)); > DataStream<Persion> flintstones = env.fromCollection(list); > Table table = tEnv.fromDataStream(flintstones); > Table select = table.select($("name"), $("age"), > $("addtime").proctime()); > Table select1 = select.window( > Tumble.over(lit(10).second()) > .on($("addtime")) > .as("w")) > .groupBy($("name"), $("w")) > .select($("name"), $("age").sum()); > select1.execute().print(); > > } > > public static class Persion{ > public String name; > public Integer age; > public Persion(){} > public Persion(String name,Integer age){ > this.name = name; > this.age = age; > } > public String toString(){ > return this.name.toString()+":age "+this.age.toString(); > } > } > >提示Exception in thread "main" org.apache.flink.table.api.ValidationException: >Window properties can only be used on windowed tables >是哪里错了?