Hi Sachin, Sorry that I misunderstood your question before. It seems like there's indeed a bug somewhere so that the registered type factory does not work on the state descriptor side. I'll try debugging it in more details. Meanwhile, you may use the ValueStateDescriptor(String name, TypeInformation<T> typeInfo) API to manually specify the typeinformation on the state side.
Best, Zhanghao Chen ________________________________ From: Sachin Mittal <sjmit...@gmail.com> Sent: Thursday, February 13, 2025 12:23 To: Zhanghao Chen <zhanghao.c...@outlook.com> Cc: user <user@flink.apache.org> Subject: Re: How to register pojo type information for third party pojo classes Hi, So I have two classes (third party pojos): -------------------------------------------------------------------- public class A { private List<B> bList; ... } public class B { ... } -------------------------------------------------------------------- I have defined my type info factories as: -------------------------------------------------------------------- public class ATypeInfoFactory extends TypeInfoFactory<A> { @Override public TypeInformation<A> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { put("bList", Types.LIST(Types.POJO(B.class))); ... } }; return Types.POJO(A.class, fields); } } public class BTypeInfoFactory extends TypeInfoFactory<B> { @Override public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { ... } }; return Types.POJO(B.class, fields); } } -------------------------------------------------------------------- So I am setting this as: -------------------------------------------------------------------- configuration.set( PipelineOptions.SERIALIZATION_CONFIG, List.of( "A: {type: typeinfo, class: ATypeInfoFactory}", "B: {type: typeinfo, class: BTypeInfoFactory}")); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); also I have set: env.getConfig().disableGenericTypes(); And I am using this in my state as: ValueStateDescriptor<A> aStateDescriptor = new ValueStateDescriptor<>("a.state", A.class); aState = getRuntimeContext().getState(aStateDescriptor); So now when I run my flink job I get this error: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:88) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:355) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347) at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.createSerializer(AbstractRuntimeUDFContext.java:101) at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:336) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:202) So my question is if I have defined a serializer for A class, why is it still creating a pojo serializer ? Thanks Sachin On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote: Hi, you may use the option "pipeline.serialization-config" [1] to register type info for any custom type, which is available since Flink 1.19. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config Best, Zhanghao Chen ________________________________ From: Sachin Mittal <sjmit...@gmail.com<mailto:sjmit...@gmail.com>> Sent: Wednesday, February 12, 2025 20:20 To: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: How to register pojo type information for third party pojo classes Hi, I have a Pojo class provided by some library. Say A.class I can create a type info factory of the same like: public class ATypeInfoFactory extends TypeInfoFactory<A> { @Override public TypeInformation<A> createTypeInfo( Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { ... } }; return Types.POJO(A.class, fields); } } Now I want to register this type information whenever A class's object is serialized or de-serialized in Flink state. How can I register this to the StreamExecutionEnvironment. Thanks Sachin