Hi!

Flink 1.11 以来对自动类型推导进行了一些修改。可能需要添加一些 annotations 才能推导类型。详见文档
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/#%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc

Tianwang Li <[email protected]> 于2021年7月15日周四 上午10:14写道:

> Flink-1.13 注册 UDAF accumulator 类型识别失败,
> 在Flink-1.10的时候是可以的。
>
> 请问在新的版本要如何注册写UDAF??
>
> 错误信息:
>
> Caused by: org.apache.flink.table.api.ValidationException: SQL
> validation failed. An error occurred in the type inference logic of
> function 'default_catalog.default_database.hlp_count'.
>    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:155)
>    at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>    at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>    at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:99)
>    at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:702)
>    ... 4 more
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function
> 'default_catalog.default_database.hlp_count'.
>    at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>    at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>    at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>    at java.util.Optional.flatMap(Optional.java:241)
>    at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>    at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>    at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182)
>    at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
>    at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199)
>    at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
>    at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944)
>    at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
>    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:150)
>    ... 11 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'com.test.flink.udaf.HlpUDAF'. Please check for implementation
> mistakes and/or provide a corresponding hint.
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>    at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>    at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
>    at
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
>    at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>    ... 23 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to accumulator mapping.
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
>    at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
>    at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>    ... 26 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public void
> com.test.flink.udaf.HlpUDAF.accumulate(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus,java.lang.String)
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
>    ... 28 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class
> com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' in
> generic class 'org.apache.flink.table.functions.AggregateFunction' in
> class com.test.flink.udaf.HlpUDAF. Please pass the required data type
> manually or allow RAW types.
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>    at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>    ... 29 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class
> com.clearspring.analytics.stream.cardinality.HyperLogLogPlus'.
> Interpreting it as a structured type was also not successful.
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
>    ... 36 more
> Caused by: org.apache.flink.table.api.ValidationException: Field
> 'format' of class
> 'com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' is
> neither publicly accessible nor does it have a corresponding getter
> method.
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
>    at
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:521)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493)
>    at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
>    at
> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359)
>    at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>    at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>    at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>    at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>    at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>    at
> java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491)
>    at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
>    ... 37 more
>
>
>
> 代码:
>
> import
> com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
> import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
> import org.apache.flink.table.functions.AggregateFunction;
>
> public class HlpUDAF extends AggregateFunction<Long, HyperLogLogPlus> {
>
>     public void accumulate(HyperLogLogPlus acc, String id) {
>         acc.offer(id);
>     }
>
>     @Override
>     public Long getValue(HyperLogLogPlus accumulator) {
>         return accumulator.cardinality();
>     }
>
>     @Override
>     public HyperLogLogPlus createAccumulator() {
>         return new HyperLogLogPlus(14);
>     }
>
>     public void merge(HyperLogLogPlus accumulator,
> Iterable<HyperLogLogPlus> iterable){
>
>         iterable.forEach( accToBeMerged -> {
>             try {
>                 accumulator.addAll(accToBeMerged);
>             } catch (CardinalityMergeException e) {
>                 throw new RuntimeException("HyperLogLog merge
> CardinalityMergeException if other is not compatible.");
>             }
>
>         } );
>     }
>
>     public void retract(HyperLogLogPlus accumulator, String id) {
>         throw new RuntimeException("This function not require retract
> method, but the retract method is called.");
>     }
>
> }
>
>
>
>
>
> --
> **************************************
>  tivanli
> **************************************
>

回复