Hi! 这个自定义类型是作为 accumulator 还是被聚合的值?如果是 accumulator 则不应该报错,能否分享一下 udaf 的代码?如果是作为被聚合的值,目前自定义类型只支持 pojo,对 pojo 的要求详见 [1]。
[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos 陈卓宇 <[email protected]> 于2021年12月20日周一 17:00写道: > 在自定义udaf函数实现中使用了一些flinksql不支持的数据类型 > 想请问如何进行自定义数据类型的实现 > > > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. An error occurred in the type inference logic of > function 'Average'. > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) > at com.analysys.avg2.main(avg2.java:70) > Caused by: org.apache.flink.table.api.ValidationException: An error > occurred in the type inference logic of function 'Average'. > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) > at java.util.Optional.flatMap(Optional.java:241) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) > at > org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159) > ... 5 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a valid type inference for function class > 'com.analysys.avg2$Average'. Please check for implementation mistakes > and/or provide a corresponding hint. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98) > at > org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) > ... 16 more > Caused by: org.apache.flink.table.api.ValidationException: Error in > extracting a signature to accumulator mapping. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) > ... 19 more > Caused by: org.apache.flink.table.api.ValidationException: Unable to > extract a type inference from method: > public void > com.analysys.avg2$Average.accumulate(com.analysys.avg2$SumCount,java.lang.Integer) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124) > ... 21 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class com.analysys.avg2$SumCount' in generic > class 'org.apache.flink.table.functions.AggregateFunction' in class > com.analysys.avg2$Average. Please pass the required data type manually or > allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) > ... 22 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class com.analysys.avg2$SumCount'. Interpreting > it as a structured type was also not successful. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) > ... 29 more > Caused by: org.apache.flink.table.api.ValidationException: Error in field > 'map' of class 'com.analysys.avg2$SumCount'. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredTypeFields(DataTypeExtractor.java:540) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:514) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289) > ... 30 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class > org.roaringbitmap.longlong.Roaring64NavigableMap' in generic class > 'org.apache.flink.table.functions.AggregateFunction' in class > com.analysys.avg2$Average. Please pass the required data type manually or > allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredTypeFields(DataTypeExtractor.java:537) > ... 32 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class > org.roaringbitmap.longlong.Roaring64NavigableMap'. Interpreting it as a > structured type was also not successful. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) > ... 34 more > Caused by: org.apache.flink.table.api.ValidationException: Field > 'highToBitmap' of class 'org.roaringbitmap.longlong.Roaring64NavigableMap' > is neither publicly accessible nor does it have a corresponding getter > method. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357) > at > org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:522) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493) > at > java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) > at > java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1351) > at > java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) > at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289) > ... 35 more > > > > > > 陈卓 > > >
