[ https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Cranmer resolved FLINK-34954. ----------------------------------- Resolution: Fixed > Kryo input implementation NoFetchingInput fails to handle zero length bytes > --------------------------------------------------------------------------- > > Key: FLINK-34954 > URL: https://issues.apache.org/jira/browse/FLINK-34954 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System > Affects Versions: 1.19.0 > Reporter: Qinghui Xu > Assignee: Qinghui Xu > Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > If the serailized bytes are empty, `NoFetchingInput` will run into error when > Kryo tries to deserialize it. > Example: a protobuf 3 object that contains only default values will be > serialized as 0 length byte array, and the deserialization later will fail. > Illustration: > {noformat} > import com.esotericsoftware.kryo.Kryo > import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, > Input, Output} > import com.google.protobuf.{DescriptorProtos, Message}import > com.twitter.chill.protobuf.ProtobufSerializer > import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput > import java.io.ByteArrayInputStream > > object ProtoSerializationTest { > def main(args: Array[String]) = { > val chillProtoSerializer = new ProtobufSerializer > val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance > val output: Output = new ByteBufferOutput(1000) > chillProtoSerializer.write(null, output, protomessage) > val serialized: Array[Byte] = output.toBytes > println(s"Serialized : $serialized") > val input: Input = new NoFetchingInput(new > ByteArrayInputStream(serialized)) > val deserialized = chillProtoSerializer.read(null, input, > classOf[BillableClick].asInstanceOf[Class[Message]]) > println(deserialized) > } > } > {noformat} > > Error > {noformat} > Exception in thread "main" java.lang.RuntimeException: Could not create class > com.criteo.glup.BillableClickProto$BillableClick > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) > at > com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22) > at ProtoSerialization.main(ProtoSerialization.scala) > Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No > more bytes left. > at > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332) > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) > ... 2 more > Caused by: java.io.EOFException: No more bytes left. > ... 5 more{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)