Hi! Flink 1.11 以来对自动类型推导进行了一些修改。可能需要添加一些 annotations 才能推导类型。详见文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/#%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
Tianwang Li <[email protected]> 于2021年7月15日周四 上午10:14写道: > Flink-1.13 注册 UDAF accumulator 类型识别失败, > 在Flink-1.10的时候是可以的。 > > 请问在新的版本要如何注册写UDAF?? > > 错误信息: > > Caused by: org.apache.flink.table.api.ValidationException: SQL > validation failed. An error occurred in the type inference logic of > function 'default_catalog.default_database.hlp_count'. > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:155) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:99) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:702) > ... 4 more > Caused by: org.apache.flink.table.api.ValidationException: An error > occurred in the type inference logic of function > 'default_catalog.default_database.hlp_count'. > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) > at java.util.Optional.flatMap(Optional.java:241) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) > at > org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:150) > ... 11 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a valid type inference for function class > 'com.test.flink.udaf.HlpUDAF'. Please check for implementation > mistakes and/or provide a corresponding hint. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98) > at > org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) > ... 23 more > Caused by: org.apache.flink.table.api.ValidationException: Error in > extracting a signature to accumulator mapping. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) > ... 26 more > Caused by: org.apache.flink.table.api.ValidationException: Unable to > extract a type inference from method: > public void > com.test.flink.udaf.HlpUDAF.accumulate(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus,java.lang.String) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124) > ... 28 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class > com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' in > generic class 'org.apache.flink.table.functions.AggregateFunction' in > class com.test.flink.udaf.HlpUDAF. Please pass the required data type > manually or allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) > ... 29 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class > com.clearspring.analytics.stream.cardinality.HyperLogLogPlus'. > Interpreting it as a structured type was also not successful. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) > ... 36 more > Caused by: org.apache.flink.table.api.ValidationException: Field > 'format' of class > 'com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' is > neither publicly accessible nor does it have a corresponding getter > method. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356) > at > org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:521) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493) > at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) > at > java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359) > at > java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) > at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289) > ... 37 more > > > > 代码: > > import > com.clearspring.analytics.stream.cardinality.CardinalityMergeException; > import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; > import org.apache.flink.table.functions.AggregateFunction; > > public class HlpUDAF extends AggregateFunction<Long, HyperLogLogPlus> { > > public void accumulate(HyperLogLogPlus acc, String id) { > acc.offer(id); > } > > @Override > public Long getValue(HyperLogLogPlus accumulator) { > return accumulator.cardinality(); > } > > @Override > public HyperLogLogPlus createAccumulator() { > return new HyperLogLogPlus(14); > } > > public void merge(HyperLogLogPlus accumulator, > Iterable<HyperLogLogPlus> iterable){ > > iterable.forEach( accToBeMerged -> { > try { > accumulator.addAll(accToBeMerged); > } catch (CardinalityMergeException e) { > throw new RuntimeException("HyperLogLog merge > CardinalityMergeException if other is not compatible."); > } > > } ); > } > > public void retract(HyperLogLogPlus accumulator, String id) { > throw new RuntimeException("This function not require retract > method, but the retract method is called."); > } > > } > > > > > > -- > ************************************** > tivanli > ************************************** >
