Flink-1.13 注册 UDAF accumulator 类型识别失败,
在Flink-1.10的时候是可以的。
请问在新的版本要如何注册写UDAF??
错误信息:
Caused by: org.apache.flink.table.api.ValidationException: SQL
validation failed. An error occurred in the type inference logic of
function 'default_catalog.default_database.hlp_count'.
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:155)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:99)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:702)
... 4 more
Caused by: org.apache.flink.table.api.ValidationException: An error
occurred in the type inference logic of function
'default_catalog.default_database.hlp_count'.
at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
at java.util.Optional.flatMap(Optional.java:241)
at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
at
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:150)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a valid type inference for function class
'com.test.flink.udaf.HlpUDAF'. Please check for implementation
mistakes and/or provide a corresponding hint.
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
... 23 more
Caused by: org.apache.flink.table.api.ValidationException: Error in
extracting a signature to accumulator mapping.
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to
extract a type inference from method:
public void
com.test.flink.udaf.HlpUDAF.accumulate(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus,java.lang.String)
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
... 28 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' in
generic class 'org.apache.flink.table.functions.AggregateFunction' in
class com.test.flink.udaf.HlpUDAF. Please pass the required data type
manually or allow RAW types.
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
... 29 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus'.
Interpreting it as a structured type was also not successful.
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
... 36 more
Caused by: org.apache.flink.table.api.ValidationException: Field
'format' of class
'com.clearspring.analytics.stream.cardinality.HyperLogLogPlus' is
neither publicly accessible nor does it have a corresponding getter
method.
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
at
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:521)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493)
at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359)
at
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491)
at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
... 37 more
代码:
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import org.apache.flink.table.functions.AggregateFunction;
public class HlpUDAF extends AggregateFunction<Long, HyperLogLogPlus> {
public void accumulate(HyperLogLogPlus acc, String id) {
acc.offer(id);
}
@Override
public Long getValue(HyperLogLogPlus accumulator) {
return accumulator.cardinality();
}
@Override
public HyperLogLogPlus createAccumulator() {
return new HyperLogLogPlus(14);
}
public void merge(HyperLogLogPlus accumulator,
Iterable<HyperLogLogPlus> iterable){
iterable.forEach( accToBeMerged -> {
try {
accumulator.addAll(accToBeMerged);
} catch (CardinalityMergeException e) {
throw new RuntimeException("HyperLogLog merge
CardinalityMergeException if other is not compatible.");
}
} );
}
public void retract(HyperLogLogPlus accumulator, String id) {
throw new RuntimeException("This function not require retract
method, but the retract method is called.");
}
}
--
**************************************
tivanli
**************************************