Hi!

这个自定义类型是作为 accumulator 还是被聚合的值?如果是 accumulator 则不应该报错,能否分享一下 udaf
的代码?如果是作为被聚合的值,目前自定义类型只支持 pojo,对 pojo 的要求详见 [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos

陈卓宇 <[email protected]> 于2021年12月20日周一 17:00写道:

> 在自定义udaf函数实现中使用了一些flinksql不支持的数据类型
> 想请问如何进行自定义数据类型的实现
>
>
>
>
>
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type inference logic of
> function 'Average'.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> &nbsp;&nbsp; &nbsp;at com.analysys.avg2.main(avg2.java:70)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'Average'.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> &nbsp;&nbsp; &nbsp;at java.util.Optional.flatMap(Optional.java:241)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
> &nbsp;&nbsp; &nbsp;at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
> &nbsp;&nbsp; &nbsp;at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
> &nbsp;&nbsp; &nbsp;at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
> &nbsp;&nbsp; &nbsp;at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
> &nbsp;&nbsp; &nbsp;at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
> &nbsp;&nbsp; &nbsp;at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
> &nbsp;&nbsp; &nbsp;... 5 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'com.analysys.avg2$Average'. Please check for implementation mistakes
> and/or provide a corresponding hint.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
> &nbsp;&nbsp; &nbsp;... 16 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to accumulator mapping.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
> &nbsp;&nbsp; &nbsp;... 19 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public void
> com.analysys.avg2$Average.accumulate(com.analysys.avg2$SumCount,java.lang.Integer)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
> &nbsp;&nbsp; &nbsp;... 21 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class com.analysys.avg2$SumCount' in generic
> class 'org.apache.flink.table.functions.AggregateFunction' in class
> com.analysys.avg2$Average. Please pass the required data type manually or
> allow RAW types.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
> &nbsp;&nbsp; &nbsp;... 22 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class com.analysys.avg2$SumCount'. Interpreting
> it as a structured type was also not successful.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
> &nbsp;&nbsp; &nbsp;... 29 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in field
> 'map' of class 'com.analysys.avg2$SumCount'.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredTypeFields(DataTypeExtractor.java:540)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:514)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
> &nbsp;&nbsp; &nbsp;... 30 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class
> org.roaringbitmap.longlong.Roaring64NavigableMap' in generic class
> 'org.apache.flink.table.functions.AggregateFunction' in class
> com.analysys.avg2$Average. Please pass the required data type manually or
> allow RAW types.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredTypeFields(DataTypeExtractor.java:537)
> &nbsp;&nbsp; &nbsp;... 32 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class
> org.roaringbitmap.longlong.Roaring64NavigableMap'. Interpreting it as a
> structured type was also not successful.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
> &nbsp;&nbsp; &nbsp;... 34 more
> Caused by: org.apache.flink.table.api.ValidationException: Field
> 'highToBitmap' of class 'org.roaringbitmap.longlong.Roaring64NavigableMap'
> is neither publicly accessible nor does it have a corresponding getter
> method.
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:522)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
> &nbsp;&nbsp; &nbsp;at
> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1351)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> &nbsp;&nbsp; &nbsp;at
> java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491)
> &nbsp;&nbsp; &nbsp;at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
> &nbsp;&nbsp; &nbsp;... 35 more
>
>
>
>
>
> 陈卓
>
>
> &nbsp;

回复