测试验证可行,thx !

@FunctionHint(
        accumulator = @DataTypeHint(value = "RAW", bridgedTo =
HyperLogLogPlus.class),
        input = @DataTypeHint("STRING"),
        output = @DataTypeHint("BIGINT")
)
public void accumulate(HyperLogLogPlus acc, String id) {
    acc.offer(id);
}


Caizhi Weng <[email protected]> 于2021年7月15日周四 上午11:15写道:

> Hi!
>
> Flink 1.11 以来对自动类型推导进行了一些修改。可能需要添加一些 annotations 才能推导类型。详见文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/#%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
> Tianwang Li <[email protected]> 于2021年7月15日周四 上午10:14写道:
>
> > Flink-1.13 注册 UDAF accumulator 类型识别失败,
> > 在Flink-1.10的时候是可以的。
> >
> > 请问在新的版本要如何注册写UDAF??
> >
> > 错误信息:
> >
> > Caused by: org.apache.flink.table.api.ValidationException: SQL
> > validation failed. An error occurred in the type inference logic of
> > function 'default_catalog.default_database.hlp_count'.
> >    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:155)
> >    at
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> >    at
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> >    at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:99)
> >    at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:702)
> >    ... 4 more
> > Caused by: org.apache.flink.table.api.ValidationException: An error
> > occurred in the type inference logic of function
> > 'default_catalog.default_database.hlp_count'.
> >    at
> >
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
> >    at
> >
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
> >    at
> >
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> >    at java.util.Optional.flatMap(Optional.java:241)
> >    at
> >
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
> >    at
> >
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
> >    at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182)
> >    at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
> >    at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199)
> >    at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
> >    at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944)
> >    at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
> >    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:150)
> >    ... 11 more
> > Caused by: org.apache.flink.table.api.ValidationException: Could not
> > extract a valid type inference for function class
> > 'com.test.flink.udaf.HlpUDAF'. Please check for implementation
> > mistakes and/or provide a corresponding hint.
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> >    at
> >
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
> >    at
> >
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
> >    at
> >
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
> >    at
> >
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
> >    ... 23 more
> > Caused by: org.apache.flink.table.api.ValidationException: Error in
> > extracting a signature to accumulator mapping.
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
> >    at
> >
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
> >    at
> >
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
> >    ... 26 more
> > Caused by: org.apache.flink.table.api.ValidationException: Unable to
> > extract a type inference from method:
> > public void
> >
> com.test.flink.udaf.HlpUDAF.accumulate(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus,java.lang.String)
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
> >    ... 28 more
> > Caused by: org.apache.flink.table.api.ValidationException: Could not
> > extract a data type from 'class
> > com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' in
> > generic class 'org.apache.flink.table.functions.AggregateFunction' in
> > class com.test.flink.udaf.HlpUDAF. Please pass the required data type
> > manually or allow RAW types.
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
> >    at
> >
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
> >    ... 29 more
> > Caused by: org.apache.flink.table.api.ValidationException: Could not
> > extract a data type from 'class
> > com.clearspring.analytics.stream.cardinality.HyperLogLogPlus'.
> > Interpreting it as a structured type was also not successful.
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
> >    ... 36 more
> > Caused by: org.apache.flink.table.api.ValidationException: Field
> > 'format' of class
> > 'com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' is
> > neither publicly accessible nor does it have a corresponding getter
> > method.
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
> >    at
> >
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:521)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493)
> >    at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
> >    at
> > java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359)
> >    at
> >
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> >    at
> >
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> >    at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> >    at
> >
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> >    at
> > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> >    at
> > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> >    at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> >    at
> > java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491)
> >    at
> >
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
> >    ... 37 more
> >
> >
> >
> > 代码:
> >
> > import
> > com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
> > import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
> > import org.apache.flink.table.functions.AggregateFunction;
> >
> > public class HlpUDAF extends AggregateFunction<Long, HyperLogLogPlus> {
> >
> >     public void accumulate(HyperLogLogPlus acc, String id) {
> >         acc.offer(id);
> >     }
> >
> >     @Override
> >     public Long getValue(HyperLogLogPlus accumulator) {
> >         return accumulator.cardinality();
> >     }
> >
> >     @Override
> >     public HyperLogLogPlus createAccumulator() {
> >         return new HyperLogLogPlus(14);
> >     }
> >
> >     public void merge(HyperLogLogPlus accumulator,
> > Iterable<HyperLogLogPlus> iterable){
> >
> >         iterable.forEach( accToBeMerged -> {
> >             try {
> >                 accumulator.addAll(accToBeMerged);
> >             } catch (CardinalityMergeException e) {
> >                 throw new RuntimeException("HyperLogLog merge
> > CardinalityMergeException if other is not compatible.");
> >             }
> >
> >         } );
> >     }
> >
> >     public void retract(HyperLogLogPlus accumulator, String id) {
> >         throw new RuntimeException("This function not require retract
> > method, but the retract method is called.");
> >     }
> >
> > }
> >
> >
> >
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>


-- 
**************************************
 tivanli
**************************************

回复