Hi, For now I am making it work by using: ATypeInfoFactory factory = new ATypeInfoFactory(); ValueStateDescriptor<RandomCutForestState> aStateDescriptor = new ValueStateDescriptor<>("a.state", factory.createTypeInfo(A.class, new HashMap<>()));
I have another problem which I am facing is when I try to update the state with an array of primitive types. Example: public class A { private long[] seq; ... } Now say in my object a, seq = null. When I try to update state using ValueState<A> aState = getRuntimeContext().getState(aStateDescriptor); A a = new A(); // seq == null aState.update(a); I get: java.lang.ClassCastException: null Any idea how can I make this work for null values of primitive array ? Thanks Sachin On Thu, Feb 13, 2025 at 2:56 PM Zhanghao Chen <zhanghao.c...@outlook.com> wrote: > Hi Sachin, > > Sorry that I misunderstood your question before. It seems like there's > indeed a bug somewhere so that the registered type factory does not work on > the state descriptor side. I'll try debugging it in more details. > Meanwhile, you may use the ValueStateDescriptor(String name, > TypeInformation<T> typeInfo) API to manually specify the typeinformation on > the state side. > > Best, > Zhanghao Chen > ------------------------------ > *From:* Sachin Mittal <sjmit...@gmail.com> > *Sent:* Thursday, February 13, 2025 12:23 > *To:* Zhanghao Chen <zhanghao.c...@outlook.com> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: How to register pojo type information for third party pojo > classes > > Hi, > So I have two classes (third party pojos): > -------------------------------------------------------------------- > > public class A { > > private List<B> bList; > > ... > > } > > public class B { > > ... > > } > > -------------------------------------------------------------------- > > I have defined my type info factories as: > -------------------------------------------------------------------- > > public class ATypeInfoFactory extends TypeInfoFactory<A> { > @Override > public TypeInformation<A> createTypeInfo(Type t, Map<String, > TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > > put("bList", Types.LIST(Types.POJO(B.class))); > ... > } > }; > return Types.POJO(A.class, fields); > } > } > > public class BTypeInfoFactory extends TypeInfoFactory<B> { > @Override > public TypeInformation<B> createTypeInfo(Type t, Map<String, > TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > ... > } > }; > return Types.POJO(B.class, fields); > } > } > > -------------------------------------------------------------------- > > So I am setting this as: > -------------------------------------------------------------------- > > configuration.set( > PipelineOptions.SERIALIZATION_CONFIG, > List.of( > "A: {type: typeinfo, class: ATypeInfoFactory}", > "B: {type: typeinfo, class: BTypeInfoFactory}")); > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > also I have set: > > env.getConfig().disableGenericTypes(); > > And I am using this in my state as: > > ValueStateDescriptor<A> aStateDescriptor = new > ValueStateDescriptor<>("a.state", A.class); > > aState = getRuntimeContext().getState(aStateDescriptor); > > > So now when I run my flink job I get this error: > > > java.lang.UnsupportedOperationException: Generic types have been disabled > in the ExecutionConfig and type java.util.List is treated as a generic > type. > at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer( > GenericTypeInfo.java:88) > at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer( > PojoTypeInfo.java:355) > at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer( > PojoTypeInfo.java:347) > at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext > .createSerializer(AbstractRuntimeUDFContext.java:101) > at org.apache.flink.api.common.state.StateDescriptor > .initializeSerializerUnlessSet(StateDescriptor.java:336) > at org.apache.flink.streaming.api.operators.StreamingRuntimeContext > .getState(StreamingRuntimeContext.java:202) > > So my question is if I have defined a serializer for A class, why is it > still creating a pojo serializer ? > > Thanks > Sachin > > > On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com> > wrote: > > Hi, you may use the option "pipeline.serialization-config" [1] to register > type info for any custom type, which is available since Flink 1.19. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config > > Best, > Zhanghao Chen > ------------------------------ > *From:* Sachin Mittal <sjmit...@gmail.com> > *Sent:* Wednesday, February 12, 2025 20:20 > *To:* user <user@flink.apache.org> > *Subject:* How to register pojo type information for third party pojo > classes > > Hi, > I have a Pojo class provided by some library. > Say A.class > > I can create a type info factory of the same like: > > public class ATypeInfoFactory extends TypeInfoFactory<A> { > @Override > public TypeInformation<A> createTypeInfo( > Type t, Map<String, TypeInformation<?>> genericParameters) { > Map<String, TypeInformation<?>> fields = > new HashMap<>() { > { > ... > } > }; > return Types.POJO(A.class, fields); > } > } > > Now I want to register this type information whenever A class's object is > serialized or de-serialized in Flink state. > > How can I register this to the StreamExecutionEnvironment. > > Thanks > Sachin > > > >