文档上还没有更新topN怎么使用,我尝试用row_number() over() 跑了一下,但是报错,请问topN可以是RetractStream吗?




val monthstats = bsTableEnv.sqlQuery(
  """
    |select
    |id,province,amount,
    |row_number() over(partition by id,province order by amount ) as rn
    |from mytable where type=1
    |group by
    |id,province,amount
  """.stripMargin
)
monthstats.toRetractStream[Row].print()
Exception in thread "main" org.apache.flink.table.api.TableException: 
Retraction on Over window aggregation is not supported yet. Note: Over window 
aggregation should not follow a non-windowed GroupBy aggregation.
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:178)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)

回复