Hi Sachin,

Sorry that I misunderstood your question before. It seems like there's indeed a 
bug somewhere so that the registered type factory does not work on the state 
descriptor side. I'll try debugging it in more details. Meanwhile, you may use 
the ValueStateDescriptor(String name, TypeInformation<T> typeInfo) API to 
manually specify the typeinformation on the state side.

Best,
Zhanghao Chen
________________________________
From: Sachin Mittal <sjmit...@gmail.com>
Sent: Thursday, February 13, 2025 12:23
To: Zhanghao Chen <zhanghao.c...@outlook.com>
Cc: user <user@flink.apache.org>
Subject: Re: How to register pojo type information for third party pojo classes

Hi,
So I have two classes (third party pojos):
--------------------------------------------------------------------

public class A {

  private List<B> bList;

  ...

}

public class B {

  ...

}

--------------------------------------------------------------------

I have defined my type info factories as:
--------------------------------------------------------------------

public class ATypeInfoFactory extends TypeInfoFactory<A> {
  @Override
  public TypeInformation<A> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {

            put("bList", Types.LIST(Types.POJO(B.class)));
            ...
          }
        };
    return Types.POJO(A.class, fields);
  }
}

public class BTypeInfoFactory extends TypeInfoFactory<B> {
  @Override
  public TypeInformation<B> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {
            ...
          }
        };
    return Types.POJO(B.class, fields);
  }
}

--------------------------------------------------------------------

So I am setting this as:
--------------------------------------------------------------------

configuration.set(
    PipelineOptions.SERIALIZATION_CONFIG,
    List.of(
        "A: {type: typeinfo, class: ATypeInfoFactory}",
        "B: {type: typeinfo, class: BTypeInfoFactory}"));

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

also I have set:

env.getConfig().disableGenericTypes();

And I am using this in my state as:

ValueStateDescriptor<A> aStateDescriptor = new 
ValueStateDescriptor<>("a.state", A.class);

aState = getRuntimeContext().getState(aStateDescriptor);

So now when I run my flink job I get this error:


java.lang.UnsupportedOperationException: Generic types have been disabled in 
the ExecutionConfig and type java.util.List is treated as a generic type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:88)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:355)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
at 
org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.createSerializer(AbstractRuntimeUDFContext.java:101)
at 
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:336)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:202)

So my question is if I have defined a serializer for A class, why is it still 
creating a pojo serializer ?

Thanks
Sachin


On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote:
Hi, you may use the option "pipeline.serialization-config" [1] to register type 
info for any custom type, which is available since Flink 1.19.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config

Best,
Zhanghao Chen
________________________________
From: Sachin Mittal <sjmit...@gmail.com<mailto:sjmit...@gmail.com>>
Sent: Wednesday, February 12, 2025 20:20
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: How to register pojo type information for third party pojo classes

Hi,
I have a Pojo class provided by some library.
Say A.class

I can create a type info factory of the same like:


public class ATypeInfoFactory extends TypeInfoFactory<A> {
  @Override
  public TypeInformation<A> createTypeInfo(
      Type t, Map<String, TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {
            ...
          }
        };
    return Types.POJO(A.class, fields);
  }
}

Now I want to register this type information whenever A class's object is 
serialized or de-serialized in Flink state.

How can I register this to the StreamExecutionEnvironment.

Thanks
Sachin



Reply via email to