Hi Yuxia,

Thanks for getting back to me.

SortOperator is a class in Hudi that was copied from Flink. The code says:

/**
 * Operator for batch sort.
 *
 * <p>Copied from org.apache.flink.table.runtime.operators.sort.SortOperator to 
change the annotation.
 */
public class SortOperator extends TableStreamOperator<RowData>
    implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {

Input is RowData. So I assume it’s using RowTypeInfo.

Regards,

— Ken

> On Dec 6, 2022, at 8:55 PM, yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
> 
> Hi, what's the type of the input for the SortOperator? I mean what's the 
> TypeInformation? For example, PojoTypeInfo or RowTypeInfo?
> 
> Best regards,
> Yuxia
> 
> 发件人: "Ken Krugler" <kkrugler_li...@transpac.com>
> 收件人: "User" <user@flink.apache.org>
> 发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17
> 主题: Registering serializer for RowData
> 
> Hi there,
> 
> I’m using the Hudi sink to write data, in bulk insert mode, and running into 
> an issue where Hudi is unhappy because (I think) Flink is using the Kryo 
> serializer for RowData records, instead of something that extends 
> AbstractRowDataSerializer.
> 
> It’s this bit of (Hudi) code in SortOperator.java that fails:
> 
>     AbstractRowDataSerializer inputSerializer =
>         (AbstractRowDataSerializer)
>             
> getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
>     this.binarySerializer = new 
> BinaryRowDataSerializer(inputSerializer.getArity());
> 
> And I get:
> 
> Caused by: java.lang.ClassCastException: class 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be 
> cast to class 
> org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer 
> (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and 
> org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in 
> unnamed module of loader 'app')
>       at 
> org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>       …
> 
> So I’m wondering if the Flink table code configures this serializer, and I 
> need to do the same in my Java API-based workflow.
> 
> Thanks,
> 
> — Ken
> 
> PS - This is with Flink 1.15.1 and Hudi 0.12.0
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to