Hi,
For now I am making it work by using:

ATypeInfoFactory factory = new ATypeInfoFactory();
ValueStateDescriptor<RandomCutForestState> aStateDescriptor = new
ValueStateDescriptor<>("a.state", factory.createTypeInfo(A.class, new
HashMap<>()));


I have another problem which I am facing is when I try to update the
state with an array of primitive types.

Example:

public class A {

  private long[] seq;

  ...

}

Now say in my object a, seq = null.

When I try to update state using

ValueState<A> aState = getRuntimeContext().getState(aStateDescriptor);

A a = new A(); // seq == null

aState.update(a);


I get:

java.lang.ClassCastException: null

Any idea how can I make this work for null values of primitive array ?

Thanks
Sachin


On Thu, Feb 13, 2025 at 2:56 PM Zhanghao Chen <zhanghao.c...@outlook.com>
wrote:

> Hi Sachin,
>
> Sorry that I misunderstood your question before. It seems like there's
> indeed a bug somewhere so that the registered type factory does not work on
> the state descriptor side. I'll try debugging it in more details.
> Meanwhile, you may use the ValueStateDescriptor(String name,
> TypeInformation<T> typeInfo) API to manually specify the typeinformation on
> the state side.
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <sjmit...@gmail.com>
> *Sent:* Thursday, February 13, 2025 12:23
> *To:* Zhanghao Chen <zhanghao.c...@outlook.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: How to register pojo type information for third party pojo
> classes
>
> Hi,
> So I have two classes (third party pojos):
> --------------------------------------------------------------------
>
> public class A {
>
>   private List<B> bList;
>
>   ...
>
> }
>
> public class B {
>
>   ...
>
> }
>
> --------------------------------------------------------------------
>
> I have defined my type info factories as:
> --------------------------------------------------------------------
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
>   @Override
>   public TypeInformation<A> createTypeInfo(Type t, Map<String, 
> TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>
>             put("bList", Types.LIST(Types.POJO(B.class)));
>             ...
>           }
>         };
>     return Types.POJO(A.class, fields);
>   }
> }
>
> public class BTypeInfoFactory extends TypeInfoFactory<B> {
>   @Override
>   public TypeInformation<B> createTypeInfo(Type t, Map<String, 
> TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>             ...
>           }
>         };
>     return Types.POJO(B.class, fields);
>   }
> }
>
> --------------------------------------------------------------------
>
> So I am setting this as:
> --------------------------------------------------------------------
>
> configuration.set(
>     PipelineOptions.SERIALIZATION_CONFIG,
>     List.of(
>         "A: {type: typeinfo, class: ATypeInfoFactory}",
>         "B: {type: typeinfo, class: BTypeInfoFactory}"));
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>
> also I have set:
>
> env.getConfig().disableGenericTypes();
>
> And I am using this in my state as:
>
> ValueStateDescriptor<A> aStateDescriptor = new 
> ValueStateDescriptor<>("a.state", A.class);
>
> aState = getRuntimeContext().getState(aStateDescriptor);
>
>
> So now when I run my flink job I get this error:
>
>
> java.lang.UnsupportedOperationException: Generic types have been disabled
> in the ExecutionConfig and type java.util.List is treated as a generic
> type.
> at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(
> GenericTypeInfo.java:88)
> at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(
> PojoTypeInfo.java:355)
> at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(
> PojoTypeInfo.java:347)
> at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext
> .createSerializer(AbstractRuntimeUDFContext.java:101)
> at org.apache.flink.api.common.state.StateDescriptor
> .initializeSerializerUnlessSet(StateDescriptor.java:336)
> at org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> .getState(StreamingRuntimeContext.java:202)
>
> So my question is if I have defined a serializer for A class, why is it
> still creating a pojo serializer ?
>
> Thanks
> Sachin
>
>
> On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com>
> wrote:
>
> Hi, you may use the option "pipeline.serialization-config" [1] to register
> type info for any custom type, which is available since Flink 1.19.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <sjmit...@gmail.com>
> *Sent:* Wednesday, February 12, 2025 20:20
> *To:* user <user@flink.apache.org>
> *Subject:* How to register pojo type information for third party pojo
> classes
>
> Hi,
> I have a Pojo class provided by some library.
> Say A.class
>
> I can create a type info factory of the same like:
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
>   @Override
>   public TypeInformation<A> createTypeInfo(
>       Type t, Map<String, TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>             ...
>           }
>         };
>     return Types.POJO(A.class, fields);
>   }
> }
>
> Now I want to register this type information whenever A class's object is
> serialized or de-serialized in Flink state.
>
> How can I register this to the StreamExecutionEnvironment.
>
> Thanks
> Sachin
>
>
>
>

Reply via email to