Hi, what's the type of the input for the SortOperator? I mean what's the TypeInformation? For example, PojoTypeInfo or RowTypeInfo?
Best regards, Yuxia 发件人: "Ken Krugler" <kkrugler_li...@transpac.com> 收件人: "User" <user@flink.apache.org> 发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17 主题: Registering serializer for RowData Hi there, I’m using the Hudi sink to write data, in bulk insert mode, and running into an issue where Hudi is unhappy because (I think) Flink is using the Kryo serializer for RowData records, instead of something that extends AbstractRowDataSerializer. It’s this bit of (Hudi) code in SortOperator.java that fails: AbstractRowDataSerializer inputSerializer = (AbstractRowDataSerializer) getOperatorConfig(). getTypeSerializerIn1 (getUserCodeClassloader()); this . binarySerializer = new BinaryRowDataSerializer( inputSerializer .getArity()); And I get: Caused by: java.lang.ClassCastException: class org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in unnamed module of loader 'app') at org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) … So I’m wondering if the Flink table code configures this serializer, and I need to do the same in my Java API-based workflow. Thanks, — Ken PS - This is with Flink 1.15.1 and Hudi 0.12.0 -------------------------- Ken Krugler [ http://www.scaleunlimited.com/ | http://www.scaleunlimited.com ] Custom big data solutions Flink, Pinot, Solr, Elasticsearch