Hi, what's the type of the input for the SortOperator? I mean what's the 
TypeInformation? For example, PojoTypeInfo or RowTypeInfo? 

Best regards, 
Yuxia 


发件人: "Ken Krugler" <kkrugler_li...@transpac.com> 
收件人: "User" <user@flink.apache.org> 
发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17 
主题: Registering serializer for RowData 

Hi there, 

I’m using the Hudi sink to write data, in bulk insert mode, and running into an 
issue where Hudi is unhappy because (I think) Flink is using the Kryo 
serializer for RowData records, instead of something that extends 
AbstractRowDataSerializer. 

It’s this bit of (Hudi) code in SortOperator.java that fails: 

AbstractRowDataSerializer inputSerializer = 
(AbstractRowDataSerializer) 
getOperatorConfig(). getTypeSerializerIn1 (getUserCodeClassloader()); 
this . binarySerializer = new BinaryRowDataSerializer( inputSerializer 
.getArity()); 

And I get: 

Caused by: java.lang.ClassCastException: class 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be cast 
to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer 
(org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and 
org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in 
unnamed module of loader 'app') 
at org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73) 
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
 
… 

So I’m wondering if the Flink table code configures this serializer, and I need 
to do the same in my Java API-based workflow. 

Thanks, 

— Ken 

PS - This is with Flink 1.15.1 and Hudi 0.12.0 

-------------------------- 
Ken Krugler 
[ http://www.scaleunlimited.com/ | http://www.scaleunlimited.com ] 
Custom big data solutions 
Flink, Pinot, Solr, Elasticsearch 




Reply via email to