Hi, I think for now we cannot upgrade to Flink 2.0. I understand that I am registering my own POJO serializer because the default one is not able serialize lists. Based on the documentation: In a hierarchy of types, the closest factory will be chosen while traversing upwards. However, a built-in factory has the highest precedence. A factory also has higher precedence than Flink’s built-in types, therefore you should know what you are doing.
Now as I have registered a custom POJO serializer for these types, it seems to be still picking up the default one. I suspect that it is not picking this configuration: configuration.set( PipelineOptions.SERIALIZATION_CONFIG, List.of( "A: {type: typeinfo, class: ATypeInfoFactory}", "B: {type: typeinfo, class: BTypeInfoFactory}")); Can you please check if this configuration is correct? Is there anyway I can check via logs if right serializers are registered with my custom POJOs. Thanks Sachin On Thu, Feb 13, 2025 at 12:06 PM Zhanghao Chen <zhanghao.c...@outlook.com> wrote: > What you are doing is registering two third-party classes as POJO types, > and this is actually the default behavior of Flink even without the > registration. And for POJO types, Flink will create a POJO serializer for > serialization. > > A side note: since Flink 2.0, built-in serialization support for > java.util.List is introduced, and you should not need any additional type > registrations to disable the generic types in this case. > > Best, > Zhanghao Chen > ------------------------------ > *From:* Sachin Mittal <sjmit...@gmail.com> > *Sent:* Thursday, February 13, 2025 12:23 > *To:* Zhanghao Chen <zhanghao.c...@outlook.com> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: How to register pojo type information for third party pojo > classes > > Hi, > So I have two classes (third party pojos): > -------------------------------------------------------------------- > > public class A { > > private List<B> bList; > > ... > > } > > public class B { > > ... > > } > > -------------------------------------------------------------------- > > I have defined my type info factories as: > -------------------------------------------------------------------- > > public class ATypeInfoFactory extends TypeInfoFactory<A> { > @Override > public TypeInformation<A> createTypeInfo(Type t, Map<String, > TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > > put("bList", Types.LIST(Types.POJO(B.class))); > ... > } > }; > return Types.POJO(A.class, fields); > } > } > > public class BTypeInfoFactory extends TypeInfoFactory<B> { > @Override > public TypeInformation<B> createTypeInfo(Type t, Map<String, > TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > ... > } > }; > return Types.POJO(B.class, fields); > } > } > > -------------------------------------------------------------------- > > So I am setting this as: > -------------------------------------------------------------------- > > configuration.set( > PipelineOptions.SERIALIZATION_CONFIG, > List.of( > "A: {type: typeinfo, class: ATypeInfoFactory}", > "B: {type: typeinfo, class: BTypeInfoFactory}")); > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > also I have set: > > env.getConfig().disableGenericTypes(); > > And I am using this in my state as: > > ValueStateDescriptor<A> aStateDescriptor = new > ValueStateDescriptor<>("a.state", A.class); > > aState = getRuntimeContext().getState(aStateDescriptor); > > > So now when I run my flink job I get this error: > > > java.lang.UnsupportedOperationException: Generic types have been disabled > in the ExecutionConfig and type java.util.List is treated as a generic > type. > at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer( > GenericTypeInfo.java:88) > at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer( > PojoTypeInfo.java:355) > at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer( > PojoTypeInfo.java:347) > at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext > .createSerializer(AbstractRuntimeUDFContext.java:101) > at org.apache.flink.api.common.state.StateDescriptor > .initializeSerializerUnlessSet(StateDescriptor.java:336) > at org.apache.flink.streaming.api.operators.StreamingRuntimeContext > .getState(StreamingRuntimeContext.java:202) > > So my question is if I have defined a serializer for A class, why is it > still creating a pojo serializer ? > > Thanks > Sachin > > > On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com> > wrote: > > Hi, you may use the option "pipeline.serialization-config" [1] to register > type info for any custom type, which is available since Flink 1.19. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config > > Best, > Zhanghao Chen > ------------------------------ > *From:* Sachin Mittal <sjmit...@gmail.com> > *Sent:* Wednesday, February 12, 2025 20:20 > *To:* user <user@flink.apache.org> > *Subject:* How to register pojo type information for third party pojo > classes > > Hi, > I have a Pojo class provided by some library. > Say A.class > > I can create a type info factory of the same like: > > public class ATypeInfoFactory extends TypeInfoFactory<A> { > @Override > public TypeInformation<A> createTypeInfo( > Type t, Map<String, TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > ... > } > }; > return Types.POJO(A.class, fields); > } > } > > Now I want to register this type information whenever A class's object is > serialized or de-serialized in Flink state. > > How can I register this to the StreamExecutionEnvironment. > > Thanks > Sachin > > > >