Hi,
So I have two classes (third party pojos):
--------------------------------------------------------------------

public class A {

  private List<B> bList;

  ...

}

public class B {

  ...

}

--------------------------------------------------------------------

I have defined my type info factories as:
--------------------------------------------------------------------

public class ATypeInfoFactory extends TypeInfoFactory<A> {
  @Override
  public TypeInformation<A> createTypeInfo(Type t, Map<String,
TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {

            put("bList", Types.LIST(Types.POJO(B.class)));
            ...
          }
        };
    return Types.POJO(A.class, fields);
  }
}

public class BTypeInfoFactory extends TypeInfoFactory<B> {
  @Override
  public TypeInformation<B> createTypeInfo(Type t, Map<String,
TypeInformation<?>> genericParameters) {
    Map<String, TypeInformation<?>> fields =
        new HashMap<>() {
          {
            ...
          }
        };
    return Types.POJO(B.class, fields);
  }
}

--------------------------------------------------------------------

So I am setting this as:
--------------------------------------------------------------------

configuration.set(
    PipelineOptions.SERIALIZATION_CONFIG,
    List.of(
        "A: {type: typeinfo, class: ATypeInfoFactory}",
        "B: {type: typeinfo, class: BTypeInfoFactory}"));

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

also I have set:

env.getConfig().disableGenericTypes();

And I am using this in my state as:

ValueStateDescriptor<A> aStateDescriptor = new
ValueStateDescriptor<>("a.state", A.class);

aState = getRuntimeContext().getState(aStateDescriptor);


So now when I run my flink job I get this error:


java.lang.UnsupportedOperationException: Generic types have been disabled in
the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(
GenericTypeInfo.java:88)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(
PojoTypeInfo.java:355)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(
PojoTypeInfo.java:347)
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext
.createSerializer(AbstractRuntimeUDFContext.java:101)
at org.apache.flink.api.common.state.StateDescriptor
.initializeSerializerUnlessSet(StateDescriptor.java:336)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext
.getState(StreamingRuntimeContext.java:202)

So my question is if I have defined a serializer for A class, why is it
still creating a pojo serializer ?

Thanks
Sachin


On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <zhanghao.c...@outlook.com>
wrote:

> Hi, you may use the option "pipeline.serialization-config" [1] to register
> type info for any custom type, which is available since Flink 1.19.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <sjmit...@gmail.com>
> *Sent:* Wednesday, February 12, 2025 20:20
> *To:* user <user@flink.apache.org>
> *Subject:* How to register pojo type information for third party pojo
> classes
>
> Hi,
> I have a Pojo class provided by some library.
> Say A.class
>
> I can create a type info factory of the same like:
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
>   @Override
>   public TypeInformation<A> createTypeInfo(
>       Type t, Map<String, TypeInformation<?>> genericParameters) {
>     Map<String, TypeInformation<?>> fields =
>         new HashMap<>() {
>           {
>             ...
>           }
>         };
>     return Types.POJO(A.class, fields);
>   }
> }
>
> Now I want to register this type information whenever A class's object is
> serialized or de-serialized in Flink state.
>
> How can I register this to the StreamExecutionEnvironment.
>
> Thanks
> Sachin
>
>
>
>

Reply via email to