函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends 
TableAggregateFunction&lt;Tuple2&lt;Long, Integer&gt;, 
Top2RetractTableAggregateFunction.Top2RetractAccumulator&gt; {<br/>    private 
static final Logger LOG = 
LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/>    // 
Top2 聚合中间结果数据结构<br/>    public static class Top2RetractAccumulator {<br/>       
 public long beforeFirst = 0;<br/>        public long beforeSecond = 0;<br/>    
    public long afterFirst = 0;<br/>        public long afterSecond = 0;<br/>   
 }<br/><br/>    // 创建 Top2Accumulator 累加器并做初始化<br/>    @Override<br/>    public 
Top2RetractAccumulator createAccumulator() {<br/>        LOG.info("[INFO] 
createAccumulator ...........................");<br/>        
Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>        
acc.beforeFirst = Integer.MIN_VALUE;<br/>        acc.beforeSecond = 
Integer.MIN_VALUE;<br/>        acc.afterFirst = Integer.MIN_VALUE;<br/>        
acc.afterSecond = Integer.MIN_VALUE;<br/>        return acc;<br/>    
}<br/><br/>    // 接收输入元素并累加到 Accumulator 数据结构<br/>    public void 
accumulate(Top2RetractAccumulator acc, Long value) {<br/>        
LOG.info("[INFO] accumulate ...........................");<br/>        if 
(value &gt; acc.afterFirst) {<br/>            acc.afterSecond = 
acc.afterFirst;<br/>            acc.afterFirst = value;<br/>        } else if 
(value &gt; acc.afterSecond) {<br/>            acc.afterSecond = value;<br/>    
    }<br/>    }<br/><br/>    // 带撤回的输出<br/>    public void 
emitUpdateWithRetract(Top2RetractAccumulator acc, 
RetractableCollector&lt;Tuple2&lt;Long, Integer&gt;&gt; out) {<br/>        
LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>      
  if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/>            // 
撤回旧记录<br/>            if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>          
      out.retract(Tuple2.of(acc.beforeFirst, 1));<br/>            }<br/>        
    // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterFirst, 1));<br/>    
        acc.beforeFirst = acc.afterFirst;<br/>        }<br/>        if 
(!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/>            // 
撤回旧记录<br/>            if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>         
       out.retract(Tuple2.of(acc.beforeSecond, 2));<br/>            }<br/>      
      // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterSecond, 2));<br/> 
           acc.beforeSecond = acc.afterSecond;<br/>        }<br/>    
}<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment 
env = 
StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings
 settings = EnvironmentSettings<br/>        .newInstance()<br/>        
.useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>        
.inStreamingMode()<br/>        .build();<br/>StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, settings);<br/><br/>DataStream&lt;Row&gt; 
sourceStream = env.fromElements(<br/>        Row.of("李雷", "语文", 78),<br/>       
 Row.of("韩梅梅", "语文", 50),<br/>        Row.of("李雷", "语文", 99),<br/>        
Row.of("韩梅梅", "语文", 80),<br/>        Row.of("李雷", "英语", 90),<br/>        
Row.of("韩梅梅", "英语", 40),<br/>        Row.of("李雷", "英语", 98),<br/>        
Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>// 
注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), 
$("course"), $("score"));<br/>// 
注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new 
Top2RetractTableAggregateFunction());<br/>// 
调用函数<br/>tEnv.from("stu_score")<br/>        .groupBy($("course"))<br/>        
.flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>        
.select($("course"), $("score"), $("rank"))<br/>        .execute()<br/>        
.print();<br/>```<br/>Flink 版本:1.13.5
在 2022-05-23 09:55:40,"Xuyang" <xyzhong...@163.com> 写道:
>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>
>
>
>
>--
>
>    Best!
>    Xuyang
>
>
>
>
>
>在 2022-05-22 22:35:46,"赢峰" <si_ji_f...@163.com> 写道:
>>
>>
>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 
>>输出数据。在调用的时候参考文档的使用方式:
>>```
>>tEnv.from("stu_score")
>>    .groupBy($("course"))
>>    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>>    .select($("course"), $("f0"), $("f1"))
>>```
>>使用默认 blink Planner,会抛出如下异常:
>>```
>>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>Could not find an implementation method 'emitValue' in class 
>>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' 
>>for function 'Top2' that matches the following signature:
>>void 
>>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
>> org.apache.flink.util.Collector)
>>```
>>但是使用 Old Planner,则会正常输出:
>>```
>>StreamExecutionEnvironment env = 
>>StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>>        .newInstance()
>>        .useOldPlanner()
>>        .inStreamingMode()
>>        .build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>```
>>这是什么地方使用有问题?
>>
>>
>>
>>
>> 

回复