What you are doing is registering two third-party classes as POJO types, and this is actually the default behavior of Flink even without the registration. And for POJO types, Flink will create a POJO serializer for serialization.
A side note: since Flink 2.0, built-in serialization support for java.util.List is introduced, and you should not need any additional type registrations to disable the generic types in this case. Best, Zhanghao Chen ________________________________ From: Sachin Mittal <sjmit...@gmail.com> Sent: Thursday, February 13, 2025 12:23 To: Zhanghao Chen <zhanghao.c...@outlook.com> Cc: user <user@flink.apache.org> Subject: Re: How to register pojo type information for third party pojo classes Hi, So I have two classes (third party pojos): -------------------------------------------------------------------- public class A { private List<B> bList; ... } public class B { ... } -------------------------------------------------------------------- I have defined my type info factories as: -------------------------------------------------------------------- public class ATypeInfoFactory extends TypeInfoFactory<A> { @Override public TypeInformation<A> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { put("bList", Types.LIST(Types.POJO(B.class))); ... } }; return Types.POJO(A.class, fields); } } public class BTypeInfoFactory extends TypeInfoFactory<B> { @Override public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { ... } }; return Types.POJO(B.class, fields); } } -------------------------------------------------------------------- So I am setting this as: -------------------------------------------------------------------- configuration.set( PipelineOptions.SERIALIZATION_CONFIG, List.of( "A: {type: typeinfo, class: ATypeInfoFactory}", "B: {type: typeinfo, class: BTypeInfoFactory}")); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); also I have set: env.getConfig().disableGenericTypes(); And I am using this in my state as: ValueStateDescriptor<A> aStateDescriptor = new ValueStateDescriptor<>("a.state", A.class); aState = getRuntimeContext().getState(aStateDescriptor); So now when I run my flink job I get this error: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:88) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:355) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347) at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.createSerializer(AbstractRuntimeUDFContext.java:101) at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:336) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:202) So my question is if I have defined a serializer for A class, why is it still creating a pojo serializer ? Thanks Sachin On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote: Hi, you may use the option "pipeline.serialization-config" [1] to register type info for any custom type, which is available since Flink 1.19. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config Best, Zhanghao Chen ________________________________ From: Sachin Mittal <sjmit...@gmail.com<mailto:sjmit...@gmail.com>> Sent: Wednesday, February 12, 2025 20:20 To: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: How to register pojo type information for third party pojo classes Hi, I have a Pojo class provided by some library. Say A.class I can create a type info factory of the same like: public class ATypeInfoFactory extends TypeInfoFactory<A> { @Override public TypeInformation<A> createTypeInfo( Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { ... } }; return Types.POJO(A.class, fields); } } Now I want to register this type information whenever A class's object is serialized or de-serialized in Flink state. How can I register this to the StreamExecutionEnvironment. Thanks Sachin