函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends
TableAggregateFunction<Tuple2<Long, Integer>,
Top2RetractTableAggregateFunction.Top2RetractAccumulator> {<br/> private
static final Logger LOG =
LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/> //
Top2 聚合中间结果数据结构<br/> public static class Top2RetractAccumulator {<br/>
public long beforeFirst = 0;<br/> public long beforeSecond = 0;<br/>
public long afterFirst = 0;<br/> public long afterSecond = 0;<br/>
}<br/><br/> // 创建 Top2Accumulator 累加器并做初始化<br/> @Override<br/> public
Top2RetractAccumulator createAccumulator() {<br/> LOG.info("[INFO]
createAccumulator ...........................");<br/>
Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>
acc.beforeFirst = Integer.MIN_VALUE;<br/> acc.beforeSecond =
Integer.MIN_VALUE;<br/> acc.afterFirst = Integer.MIN_VALUE;<br/>
acc.afterSecond = Integer.MIN_VALUE;<br/> return acc;<br/>
}<br/><br/> // 接收输入元素并累加到 Accumulator 数据结构<br/> public void
accumulate(Top2RetractAccumulator acc, Long value) {<br/>
LOG.info("[INFO] accumulate ...........................");<br/> if
(value > acc.afterFirst) {<br/> acc.afterSecond =
acc.afterFirst;<br/> acc.afterFirst = value;<br/> } else if
(value > acc.afterSecond) {<br/> acc.afterSecond = value;<br/>
}<br/> }<br/><br/> // 带撤回的输出<br/> public void
emitUpdateWithRetract(Top2RetractAccumulator acc,
RetractableCollector<Tuple2<Long, Integer>> out) {<br/>
LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>
if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/> //
撤回旧记录<br/> if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>
out.retract(Tuple2.of(acc.beforeFirst, 1));<br/> }<br/>
// 输出新记录<br/> out.collect(Tuple2.of(acc.afterFirst, 1));<br/>
acc.beforeFirst = acc.afterFirst;<br/> }<br/> if
(!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/> //
撤回旧记录<br/> if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>
out.retract(Tuple2.of(acc.beforeSecond, 2));<br/> }<br/>
// 输出新记录<br/> out.collect(Tuple2.of(acc.afterSecond, 2));<br/>
acc.beforeSecond = acc.afterSecond;<br/> }<br/>
}<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment
env =
StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings
settings = EnvironmentSettings<br/> .newInstance()<br/>
.useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>
.inStreamingMode()<br/> .build();<br/>StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, settings);<br/><br/>DataStream<Row>
sourceStream = env.fromElements(<br/> Row.of("李雷", "语文", 78),<br/>
Row.of("韩梅梅", "语文", 50),<br/> Row.of("李雷", "语文", 99),<br/>
Row.of("韩梅梅", "语文", 80),<br/> Row.of("李雷", "英语", 90),<br/>
Row.of("韩梅梅", "英语", 40),<br/> Row.of("李雷", "英语", 98),<br/>
Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>//
注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"),
$("course"), $("score"));<br/>//
注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new
Top2RetractTableAggregateFunction());<br/>//
调用函数<br/>tEnv.from("stu_score")<br/> .groupBy($("course"))<br/>
.flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>
.select($("course"), $("score"), $("rank"))<br/> .execute()<br/>
.print();<br/>```<br/>Flink 版本:1.13.5
在 2022-05-23 09:55:40,"Xuyang" <xyzhong...@163.com> 写道:
>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>
>
>
>
>--
>
> Best!
> Xuyang
>
>
>
>
>
>在 2022-05-22 22:35:46,"赢峰" <si_ji_f...@163.com> 写道:
>>
>>
>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract
>>输出数据。在调用的时候参考文档的使用方式:
>>```
>>tEnv.from("stu_score")
>> .groupBy($("course"))
>> .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>> .select($("course"), $("f0"), $("f1"))
>>```
>>使用默认 blink Planner,会抛出如下异常:
>>```
>>Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>Could not find an implementation method 'emitValue' in class
>>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction'
>>for function 'Top2' that matches the following signature:
>>void
>>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
>> org.apache.flink.util.Collector)
>>```
>>但是使用 Old Planner,则会正常输出:
>>```
>>StreamExecutionEnvironment env =
>>StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>> .newInstance()
>> .useOldPlanner()
>> .inStreamingMode()
>> .build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>```
>>这是什么地方使用有问题?
>>
>>
>>
>>
>>