Hello I read in this article https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html that it's possible to use SpecificRecordBase.class in the operators:
Avro Specific Avro specific records will be automatically detected by checking that the given type’s type hierarchy contains the SpecificRecordBase class. You can either specify your concrete Avro type, or—if you want to be more generic and allow different types in your operator—use the SpecificRecordBase type (or a subtype) in your user functions, in ResultTypeQueryable#getProducedType(), or in SingleOutputStreamOperator#returns(). Since specific records use generated Java code, they are strongly typed and allow direct access to the fields via known getters and setters. If I specify SpecificRecordBase.class in the SourceFunction/SinkFunction I get an exception: Caused by: java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:71) ~[?:?] at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:55) ~[?:?] at org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.createAvroTypeInfo(AvroKryoSerializerUtils.java:81) ~[?:?] at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1897) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1798) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:970) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:799) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:746) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:742) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:210) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at io.pravega.connectors.flink.serialization.PravegaDeserializationSchema.<init>(PravegaDeserializationSchema.java:64) ~[?:?] at org.apache.flink.statefun.pravega.LoadsSource.createLoadsSource(LoadsSource.java:41) ~[?:?] at org.apache.flink.statefun.pravega.EmbeddedModule.configure(EmbeddedModule.java:55) ~[?:?] at org.apache.flink.statefun.flink.core.spi.Modules.createStatefulFunctionsUniverse(Modules.java:70) ~[statefun-flink-core.jar:3.0.0] at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses$ClassPathUniverseProvider.get(StatefulFunctionsUniverses.java:43) ~[statefun-flink-core.jar:3.0.0] at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses.get(StatefulFunctionsUniverses.java:32) ~[statefun-flink-core.jar:3.0.0] at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71) ~[statefun-flink-core.jar:3.0.0] at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52) ~[statefun-flink-core.jar:3.0.0] Did I miss something? Is it possible to use SpecificRecordBase.class as a SourceFunction/SinkFunction type? Hope it was clear Thanks -- Best regards, Kirill Kosenko