Hi, So I have two classes (third party pojos): --------------------------------------------------------------------
public class A { private List<B> bList; ... } public class B { ... } -------------------------------------------------------------------- I have defined my type info factories as: -------------------------------------------------------------------- public class ATypeInfoFactory extends TypeInfoFactory<A> { @Override public TypeInformation<A> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { put("bList", Types.LIST(Types.POJO(B.class))); ... } }; return Types.POJO(A.class, fields); } } public class BTypeInfoFactory extends TypeInfoFactory<B> { @Override public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> fields = new HashMap<>() { { ... } }; return Types.POJO(B.class, fields); } } -------------------------------------------------------------------- So I am setting this as: -------------------------------------------------------------------- configuration.set( PipelineOptions.SERIALIZATION_CONFIG, List.of( "A: {type: typeinfo, class: ATypeInfoFactory}", "B: {type: typeinfo, class: BTypeInfoFactory}")); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); also I have set: env.getConfig().disableGenericTypes(); And I am using this in my state as: ValueStateDescriptor<A> aStateDescriptor = new ValueStateDescriptor<>("a.state", A.class); aState = getRuntimeContext().getState(aStateDescriptor); So now when I run my flink job I get this error: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer( GenericTypeInfo.java:88) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer( PojoTypeInfo.java:355) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer( PojoTypeInfo.java:347) at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext .createSerializer(AbstractRuntimeUDFContext.java:101) at org.apache.flink.api.common.state.StateDescriptor .initializeSerializerUnlessSet(StateDescriptor.java:336) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext .getState(StreamingRuntimeContext.java:202) So my question is if I have defined a serializer for A class, why is it still creating a pojo serializer ? Thanks Sachin On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com> wrote: > Hi, you may use the option "pipeline.serialization-config" [1] to register > type info for any custom type, which is available since Flink 1.19. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config > > Best, > Zhanghao Chen > ------------------------------ > *From:* Sachin Mittal <sjmit...@gmail.com> > *Sent:* Wednesday, February 12, 2025 20:20 > *To:* user <user@flink.apache.org> > *Subject:* How to register pojo type information for third party pojo > classes > > Hi, > I have a Pojo class provided by some library. > Say A.class > > I can create a type info factory of the same like: > > public class ATypeInfoFactory extends TypeInfoFactory<A> { > @Override > public TypeInformation<A> createTypeInfo( > Type t, Map<String, TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > ... > } > }; > return Types.POJO(A.class, fields); > } > } > > Now I want to register this type information whenever A class's object is > serialized or de-serialized in Flink state. > > How can I register this to the StreamExecutionEnvironment. > > Thanks > Sachin > > > >