Hi Kirill, as far as I know SpecificRecordBase should work in Flink, I
don't know if there's any limitation in StateFun.
It seems that the typeClass passed to the generateFieldsFromAvroSchema from
the PravegaDeserializationSchema..
Maybe the pravega.LoadsSource does not bind correctly the Avro classes and
their schema (i.e. a serializer factory)?

Best,
Flavio

On Thu, Aug 5, 2021 at 2:06 AM Kirill Kosenko <
kirill.kose...@transportexchangegroup.com> wrote:

> Hello
>
> I read in this article
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> that it's possible to use SpecificRecordBase.class in the operators:
>
> Avro Specific
> Avro specific records will be automatically detected by checking that the
> given type’s type hierarchy contains the SpecificRecordBase class. You can
> either specify your concrete Avro type, or—if you want to be more generic
> and allow different types in your operator—use the SpecificRecordBase type
> (or a subtype) in your user functions, in
> ResultTypeQueryable#getProducedType(), or in
> SingleOutputStreamOperator#returns(). Since specific records use generated
> Java code, they are strongly typed and allow direct access to the fields
> via known getters and setters.
>
> If I specify SpecificRecordBase.class in the SourceFunction/SinkFunction I
> get an exception:
>
> Caused by: java.lang.IllegalStateException: Expecting type to be a 
> PojoTypeInfo
>     at 
> org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:71)
>  ~[?:?]
>     at 
> org.apache.flink.formats.avro.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:55)
>  ~[?:?]
>     at 
> org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.createAvroTypeInfo(AvroKryoSerializerUtils.java:81)
>  ~[?:?]
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1897)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1798)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:970)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:799)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:746)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:742)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:210)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>     at 
> io.pravega.connectors.flink.serialization.PravegaDeserializationSchema.<init>(PravegaDeserializationSchema.java:64)
>  ~[?:?]
>     at 
> org.apache.flink.statefun.pravega.LoadsSource.createLoadsSource(LoadsSource.java:41)
>  ~[?:?]
>     at 
> org.apache.flink.statefun.pravega.EmbeddedModule.configure(EmbeddedModule.java:55)
>  ~[?:?]
>     at 
> org.apache.flink.statefun.flink.core.spi.Modules.createStatefulFunctionsUniverse(Modules.java:70)
>  ~[statefun-flink-core.jar:3.0.0]
>     at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses$ClassPathUniverseProvider.get(StatefulFunctionsUniverses.java:43)
>  ~[statefun-flink-core.jar:3.0.0]
>     at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses.get(StatefulFunctionsUniverses.java:32)
>  ~[statefun-flink-core.jar:3.0.0]
>     at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
>  ~[statefun-flink-core.jar:3.0.0]
>     at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
>  ~[statefun-flink-core.jar:3.0.0]
>
>
> Did I miss something?
>
> Is it possible to use SpecificRecordBase.class as a
> SourceFunction/SinkFunction type?
>
> Hope it was clear
>
> Thanks
>
> --
> Best regards,
> Kirill Kosenko
>

Reply via email to