Hi,
I think for now we cannot upgrade to Flink 2.0.

I understand that I am registering my own POJO serializer because the
default one is not able serialize lists.
Based on the documentation:
In a hierarchy of types, the closest factory will be chosen while
traversing upwards. However, a built-in factory has the highest precedence.
A factory also has higher precedence than Flink’s built-in types, therefore
you should know what you are doing.

Now as I have registered a custom POJO serializer for these types, it seems
to be still picking up the default one.

I suspect that it is not picking this configuration:

configuration.set(
    PipelineOptions.SERIALIZATION_CONFIG,
    List.of(
        "A: {type: typeinfo, class: ATypeInfoFactory}",
        "B: {type: typeinfo, class: BTypeInfoFactory}"));


Can you please check if this configuration is correct? Is there anyway
I can check via logs if right serializers are registered with my
custom POJOs.


Thanks

Sachin



On Thu, Feb 13, 2025 at 12:06 PM Zhanghao Chen <zhanghao.c...@outlook.com>
wrote:

> What you are doing is registering two third-party classes as POJO types,
> and this is actually the default behavior of Flink even without the
> registration. And for POJO types, Flink will create a POJO serializer for
> serialization.
>
> A side note: since Flink 2.0, built-in serialization support for
> java.util.List is introduced, and you should not need any additional type
> registrations to disable the generic types in this case.
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <sjmit...@gmail.com>
> *Sent:* Thursday, February 13, 2025 12:23
> *To:* Zhanghao Chen <zhanghao.c...@outlook.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: How to register pojo type information for third party pojo
> classes
>
> Hi,
> So I have two classes (third party pojos):
> --------------------------------------------------------------------
>
> public class A {
>
>   private List<B> bList;
>
>   ...
>
> }
>
> public class B {
>
>   ...
>
> }
>
> --------------------------------------------------------------------
>
> I have defined my type info factories as:
> --------------------------------------------------------------------
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
>   @Override
>   public TypeInformation<A> createTypeInfo(Type t, Map<String, 
> TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>
>             put("bList", Types.LIST(Types.POJO(B.class)));
>             ...
>           }
>         };
>     return Types.POJO(A.class, fields);
>   }
> }
>
> public class BTypeInfoFactory extends TypeInfoFactory<B> {
>   @Override
>   public TypeInformation<B> createTypeInfo(Type t, Map<String, 
> TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>             ...
>           }
>         };
>     return Types.POJO(B.class, fields);
>   }
> }
>
> --------------------------------------------------------------------
>
> So I am setting this as:
> --------------------------------------------------------------------
>
> configuration.set(
>     PipelineOptions.SERIALIZATION_CONFIG,
>     List.of(
>         "A: {type: typeinfo, class: ATypeInfoFactory}",
>         "B: {type: typeinfo, class: BTypeInfoFactory}"));
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>
> also I have set:
>
> env.getConfig().disableGenericTypes();
>
> And I am using this in my state as:
>
> ValueStateDescriptor<A> aStateDescriptor = new 
> ValueStateDescriptor<>("a.state", A.class);
>
> aState = getRuntimeContext().getState(aStateDescriptor);
>
>
> So now when I run my flink job I get this error:
>
>
> java.lang.UnsupportedOperationException: Generic types have been disabled
> in the ExecutionConfig and type java.util.List is treated as a generic
> type.
> at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(
> GenericTypeInfo.java:88)
> at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(
> PojoTypeInfo.java:355)
> at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(
> PojoTypeInfo.java:347)
> at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext
> .createSerializer(AbstractRuntimeUDFContext.java:101)
> at org.apache.flink.api.common.state.StateDescriptor
> .initializeSerializerUnlessSet(StateDescriptor.java:336)
> at org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> .getState(StreamingRuntimeContext.java:202)
>
> So my question is if I have defined a serializer for A class, why is it
> still creating a pojo serializer ?
>
> Thanks
> Sachin
>
>
> On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com>
> wrote:
>
> Hi, you may use the option "pipeline.serialization-config" [1] to register
> type info for any custom type, which is available since Flink 1.19.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <sjmit...@gmail.com>
> *Sent:* Wednesday, February 12, 2025 20:20
> *To:* user <user@flink.apache.org>
> *Subject:* How to register pojo type information for third party pojo
> classes
>
> Hi,
> I have a Pojo class provided by some library.
> Say A.class
>
> I can create a type info factory of the same like:
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
>   @Override
>   public TypeInformation<A> createTypeInfo(
>       Type t, Map<String, TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>             ...
>           }
>         };
>     return Types.POJO(A.class, fields);
>   }
> }
>
> Now I want to register this type information whenever A class's object is
> serialized or de-serialized in Flink state.
>
> How can I register this to the StreamExecutionEnvironment.
>
> Thanks
> Sachin
>
>
>
>

Reply via email to