Hello

I read in this article 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html 
that it's possible to use SpecificRecordBase.class in the operators:

Avro Specific
Avro specific records will be automatically detected by checking that the given 
type’s type hierarchy contains the SpecificRecordBase class. You can either 
specify your concrete Avro type, or—if you want to be more generic and allow 
different types in your operator—use the SpecificRecordBase type (or a subtype) 
in your user functions, in ResultTypeQueryable#getProducedType(), or in 
SingleOutputStreamOperator#returns(). Since specific records use generated Java 
code, they are strongly typed and allow direct access to the fields via known 
getters and setters.

If I specify SpecificRecordBase.class in the SourceFunction/SinkFunction I get 
an exception:


Caused by: java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
    at 
org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:71)
 ~[?:?]
    at 
org.apache.flink.formats.avro.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:55)
 ~[?:?]
    at 
org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.createAvroTypeInfo(AvroKryoSerializerUtils.java:81)
 ~[?:?]
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1897)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1798)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:970)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:799)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:746)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:742)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:210)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at 
io.pravega.connectors.flink.serialization.PravegaDeserializationSchema.<init>(PravegaDeserializationSchema.java:64)
 ~[?:?]
    at 
org.apache.flink.statefun.pravega.LoadsSource.createLoadsSource(LoadsSource.java:41)
 ~[?:?]
    at 
org.apache.flink.statefun.pravega.EmbeddedModule.configure(EmbeddedModule.java:55)
 ~[?:?]
    at 
org.apache.flink.statefun.flink.core.spi.Modules.createStatefulFunctionsUniverse(Modules.java:70)
 ~[statefun-flink-core.jar:3.0.0]
    at 
org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses$ClassPathUniverseProvider.get(StatefulFunctionsUniverses.java:43)
 ~[statefun-flink-core.jar:3.0.0]
    at 
org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses.get(StatefulFunctionsUniverses.java:32)
 ~[statefun-flink-core.jar:3.0.0]
    at 
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
 ~[statefun-flink-core.jar:3.0.0]
    at 
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
 ~[statefun-flink-core.jar:3.0.0]

Did I miss something?

Is it possible to use SpecificRecordBase.class as a SourceFunction/SinkFunction 
type?

Hope it was clear

Thanks

--
Best regards,
Kirill Kosenko

Reply via email to