Hi Yuxia, Thanks for getting back to me.
SortOperator is a class in Hudi that was copied from Flink. The code says: /** * Operator for batch sort. * * <p>Copied from org.apache.flink.table.runtime.operators.sort.SortOperator to change the annotation. */ public class SortOperator extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput { Input is RowData. So I assume it’s using RowTypeInfo. Regards, — Ken > On Dec 6, 2022, at 8:55 PM, yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > > Hi, what's the type of the input for the SortOperator? I mean what's the > TypeInformation? For example, PojoTypeInfo or RowTypeInfo? > > Best regards, > Yuxia > > 发件人: "Ken Krugler" <kkrugler_li...@transpac.com> > 收件人: "User" <user@flink.apache.org> > 发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17 > 主题: Registering serializer for RowData > > Hi there, > > I’m using the Hudi sink to write data, in bulk insert mode, and running into > an issue where Hudi is unhappy because (I think) Flink is using the Kryo > serializer for RowData records, instead of something that extends > AbstractRowDataSerializer. > > It’s this bit of (Hudi) code in SortOperator.java that fails: > > AbstractRowDataSerializer inputSerializer = > (AbstractRowDataSerializer) > > getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()); > this.binarySerializer = new > BinaryRowDataSerializer(inputSerializer.getArity()); > > And I get: > > Caused by: java.lang.ClassCastException: class > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be > cast to class > org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer > (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and > org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in > unnamed module of loader 'app') > at > org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > … > > So I’m wondering if the Flink table code configures this serializer, and I > need to do the same in my Java API-based workflow. > > Thanks, > > — Ken > > PS - This is with Flink 1.15.1 and Hudi 0.12.0 > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > > -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch