问题我自己已经解决。
> 在 2020年12月17日,下午9:00,丁浩浩 <[email protected]> 写道:
>
> flink版本:1.11.1
> udaf函数代码来自于阿里云官网文档
> ----------------------------
> 以下是代码
> public class TestSql {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
> //env.setParallelism(3);
> tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);
>
> Properties configs = CommonUtils.getConfigs();
> //注册clazz源表
> FlinkUtils.registerMysqlTable2FlinkTable(
> tableEnv,configs.getProperty("url"),
> configs.getProperty("user.name"),
> configs.getProperty("password"),
> “test", "clazz_lesson");
>
> Table table = tableEnv.sqlQuery("select
> count_uadf(clazz_number),clazz_number from clazz_lesson group by
> clazz_number");
> //Table table = tableEnv.sqlQuery("select
> number,collect(extension_value) from clazz_extension group by number ");
> tableEnv.toRetractStream(table, Row.class).print();
> env.execute();
>
>
> }
> }
>
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
> //定义存放count UDAF状态的accumulator的数据的结构。
> public static class CountAccum {
> public long total;
> }
>
> @Override
> //初始化count UDAF的accumulator。
> public CountAccum createAccumulator() {
> CountAccum acc = new CountAccum();
> acc.total = 0;
> return acc;
> }
> @Override
> //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
> public Long getValue(CountAccum accumulator) {
> return accumulator.total;
> }
>
>
> //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
> public void accumulate(CountAccum accumulator, Long iValue) {
> accumulator.total++;
> }
> public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
> for (CountAccum other : its) {
> accumulator.total += other.total;
> }
> }
> }
>
> 以下是堆栈信息
>
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. From line 1, column 8 to line 1, column 31: No match
> found for function signature count_uadf(<NUMERIC>)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> at
> com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
> column 8 to line 1, column 31: No match found for function signature
> count_uadf(<NUMERIC>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
> found for function signature count_uadf(<NUMERIC>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> ... 27 more