背景:
source: kafka
flink 版本:1.10
avro版本:1.10.0

代码:

bsTableEnv.connect(new Kafka()
                .version("universal")
                .topic(params.get("read-topic"))
                .startFromEarliest()
                .properties(this.properties)
        )
                .withFormat(
                        new ​Avro().recordClass(User.class) // 别人定义的User
                )
                .withSchema(schema)

// User 的继承关系:

import org.apache.avro.specific.SpecificRecordBase;

public class User extends SpecificRecordBase implements ***

错误日志:
Caused by: org.apache.flink.table.api.ValidationException: Could not get class 
'com.***.User' for key 'format.record-class'. at 
org.apache.flink.table.descriptors.DescriptorProperties.lambda$getOptionalClass$4(DescriptorProperties.java:389)
 at java.util.Optional.map(Optional.java:215) at 
org.apache.flink.table.descriptors.DescriptorProperties.getOptionalClass(DescriptorProperties.java:378)
 at 
org.apache.flink.table.descriptors.DescriptorProperties.getClass(DescriptorProperties.java:398)
 at 
org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:62)
 at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:281)
 at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:161)
 at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
 ... 38 more

Caused by: org.apache.flink.table.api.ValidationException: Class 'com.***.User' 
does not extend from the required class 
'org.apache.avro.specific.SpecificRecord' for key 'format.record-class'. at 
org.apache.flink.table.descriptors.DescriptorProperties.lambda$getOptionalClass$4(DescriptorProperties.java:385)
 ... 46 more

请问这个ValidationException是什么原因导致的呢?是不是因为User继承的SpecificRecord的版本和我本地使用的avro的版本不一致导致的呢?



回复