Hi there,

I’m using the Hudi sink to write data, in bulk insert mode, and running into an 
issue where Hudi is unhappy because (I think) Flink is using the Kryo 
serializer for RowData records, instead of something that extends 
AbstractRowDataSerializer.

It’s this bit of (Hudi) code in SortOperator.java that fails:

    AbstractRowDataSerializer inputSerializer =
        (AbstractRowDataSerializer)
            getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
    this.binarySerializer = new 
BinaryRowDataSerializer(inputSerializer.getArity());

And I get:

Caused by: java.lang.ClassCastException: class 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be cast 
to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer 
(org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and 
org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in 
unnamed module of loader 'app')
        at 
org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        …

So I’m wondering if the Flink table code configures this serializer, and I need 
to do the same in my Java API-based workflow.

Thanks,

— Ken

PS - This is with Flink 1.15.1 and Hudi 0.12.0

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to