Hi,

You should register a custom type info for java.util.List with a custom list 
serializer instead of class B itself in this case.

Best,
Zhanghao Chen
________________________________
From: Sachin Mittal <sjmit...@gmail.com>
Sent: Thursday, February 13, 2025 16:10
To: Zhanghao Chen <zhanghao.c...@outlook.com>
Cc: user <user@flink.apache.org>
Subject: Re: How to register pojo type information for third party pojo classes

Hi,
I think for now we cannot upgrade to Flink 2.0.

I understand that I am registering my own POJO serializer because the default 
one is not able serialize lists.
Based on the documentation:
In a hierarchy of types, the closest factory will be chosen while traversing 
upwards. However, a built-in factory has the highest precedence. A factory also 
has higher precedence than Flink’s built-in types, therefore you should know 
what you are doing.

Now as I have registered a custom POJO serializer for these types, it seems to 
be still picking up the default one.

I suspect that it is not picking this configuration:


configuration.set(
    PipelineOptions.SERIALIZATION_CONFIG,
    List.of(
        "A: {type: typeinfo, class: ATypeInfoFactory}",
        "B: {type: typeinfo, class: BTypeInfoFactory}"));


Can you please check if this configuration is correct? Is there anyway I can 
check via logs if right serializers are registered with my custom POJOs.


Thanks

Sachin


On Thu, Feb 13, 2025 at 12:06 PM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote:
What you are doing is registering two third-party classes as POJO types, and 
this is actually the default behavior of Flink even without the registration. 
And for POJO types, Flink will create a POJO serializer for serialization.

A side note: since Flink 2.0, built-in serialization support for java.util.List 
is introduced, and you should not need any additional type registrations to 
disable the generic types in this case.

Best,
Zhanghao Chen
________________________________
From: Sachin Mittal <sjmit...@gmail.com<mailto:sjmit...@gmail.com>>
Sent: Thursday, February 13, 2025 12:23
To: Zhanghao Chen <zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: How to register pojo type information for third party pojo classes

Hi,
So I have two classes (third party pojos):
--------------------------------------------------------------------

public class A {

  private List<B> bList;

  ...

}

public class B {

  ...

}

--------------------------------------------------------------------

I have defined my type info factories as:
--------------------------------------------------------------------

public class ATypeInfoFactory extends TypeInfoFactory<A> {
  @Override
  public TypeInformation<A> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {

            put("bList", Types.LIST(Types.POJO(B.class)));
            ...
          }
        };
    return Types.POJO(A.class, fields);
  }
}

public class BTypeInfoFactory extends TypeInfoFactory<B> {
  @Override
  public TypeInformation<B> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {
            ...
          }
        };
    return Types.POJO(B.class, fields);
  }
}

--------------------------------------------------------------------

So I am setting this as:
--------------------------------------------------------------------

configuration.set(
    PipelineOptions.SERIALIZATION_CONFIG,
    List.of(
        "A: {type: typeinfo, class: ATypeInfoFactory}",
        "B: {type: typeinfo, class: BTypeInfoFactory}"));

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

also I have set:

env.getConfig().disableGenericTypes();

And I am using this in my state as:

ValueStateDescriptor<A> aStateDescriptor = new 
ValueStateDescriptor<>("a.state", A.class);

aState = getRuntimeContext().getState(aStateDescriptor);

So now when I run my flink job I get this error:


java.lang.UnsupportedOperationException: Generic types have been disabled in 
the ExecutionConfig and type java.util.List is treated as a generic type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:88)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:355)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
at 
org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.createSerializer(AbstractRuntimeUDFContext.java:101)
at 
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:336)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:202)

So my question is if I have defined a serializer for A class, why is it still 
creating a pojo serializer ?

Thanks
Sachin


On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote:
Hi, you may use the option "pipeline.serialization-config" [1] to register type 
info for any custom type, which is available since Flink 1.19.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config

Best,
Zhanghao Chen
________________________________
From: Sachin Mittal <sjmit...@gmail.com<mailto:sjmit...@gmail.com>>
Sent: Wednesday, February 12, 2025 20:20
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: How to register pojo type information for third party pojo classes

Hi,
I have a Pojo class provided by some library.
Say A.class

I can create a type info factory of the same like:


public class ATypeInfoFactory extends TypeInfoFactory<A> {
  @Override
  public TypeInformation<A> createTypeInfo(
      Type t, Map<String, TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {
            ...
          }
        };
    return Types.POJO(A.class, fields);
  }
}

Now I want to register this type information whenever A class's object is 
serialized or de-serialized in Flink state.

How can I register this to the StreamExecutionEnvironment.

Thanks
Sachin



Reply via email to