Flink-1.13 注册 UDAF accumulator 类型识别失败,
在Flink-1.10的时候是可以的。

请问在新的版本要如何注册写UDAF??

错误信息:

Caused by: org.apache.flink.table.api.ValidationException: SQL
validation failed. An error occurred in the type inference logic of
function 'default_catalog.default_database.hlp_count'.
   at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:155)
   at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
   at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
   at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:99)
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:702)
   ... 4 more
Caused by: org.apache.flink.table.api.ValidationException: An error
occurred in the type inference logic of function
'default_catalog.default_database.hlp_count'.
   at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
   at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
   at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
   at java.util.Optional.flatMap(Optional.java:241)
   at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
   at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
   at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:150)
   ... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a valid type inference for function class
'com.test.flink.udaf.HlpUDAF'. Please check for implementation
mistakes and/or provide a corresponding hint.
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
   at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
   at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
   at 
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
   at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
   ... 23 more
Caused by: org.apache.flink.table.api.ValidationException: Error in
extracting a signature to accumulator mapping.
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
   at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
   at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
   ... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to
extract a type inference from method:
public void 
com.test.flink.udaf.HlpUDAF.accumulate(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus,java.lang.String)
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
   ... 28 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' in
generic class 'org.apache.flink.table.functions.AggregateFunction' in
class com.test.flink.udaf.HlpUDAF. Please pass the required data type
manually or allow RAW types.
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
   at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
   ... 29 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus'.
Interpreting it as a structured type was also not successful.
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
   ... 36 more
Caused by: org.apache.flink.table.api.ValidationException: Field
'format' of class
'com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' is
neither publicly accessible nor does it have a corresponding getter
method.
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
   at 
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:521)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493)
   at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
   at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359)
   at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
   at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
   at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   at java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491)
   at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
   ... 37 more



代码:

import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import org.apache.flink.table.functions.AggregateFunction;

public class HlpUDAF extends AggregateFunction<Long, HyperLogLogPlus> {

    public void accumulate(HyperLogLogPlus acc, String id) {
        acc.offer(id);
    }

    @Override
    public Long getValue(HyperLogLogPlus accumulator) {
        return accumulator.cardinality();
    }

    @Override
    public HyperLogLogPlus createAccumulator() {
        return new HyperLogLogPlus(14);
    }

    public void merge(HyperLogLogPlus accumulator,
Iterable<HyperLogLogPlus> iterable){

        iterable.forEach( accToBeMerged -> {
            try {
                accumulator.addAll(accToBeMerged);
            } catch (CardinalityMergeException e) {
                throw new RuntimeException("HyperLogLog merge
CardinalityMergeException if other is not compatible.");
            }

        } );
    }

    public void retract(HyperLogLogPlus accumulator, String id) {
        throw new RuntimeException("This function not require retract
method, but the retract method is called.");
    }

}





-- 
**************************************
 tivanli
**************************************

回复