Sorry for the long question but I take advantage of this discussion to ask for something I've never fully understood.. Let's say I have for example a thrift/protobuf/avro object Person.
1. Do I have really to register a custom serializer? In my code I create a dataset from parquet-thrift but I never had to register anything...Does this change something if I call registerTypeWithKryoSerializer? 2. How are performance of Flink affected by using one serialization wrt another? For example, is there a simple snippet of a Flink program that show when it's better to the original Person, its POJO version or it's Tuple version (assuming that is a flat object)? 3. Does this further change when I use Table APIs? Best, Flavio On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger <rmetz...@apache.org> wrote: > Also, we don't add serializers automatically for DataStream programs. I've > opened a JIRA for this a while ago. > > On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Kryzsztof, >> >> it's true that we once added the Protobuf serializer automatically. >> However, due to versioning conflicts (see >> https://issues.apache.org/jira/browse/FLINK-1635), we removed it again. >> Now you have to register the ProtobufSerializer manually: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program >> . >> >> Cheers, >> Till >> >> On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <k.zarzy...@gmail.com >> > wrote: >> >>> Hi! >>> I'm trying to use generated Protobuf wrappers compiled with protoc and >>> pass them as objects between functions of Flink. I'm using Flink 0.10.0. >>> Unfortunately, I get an exception on runtime: >>> >>> [...] >>> Caused by: com.esotericsoftware.kryo.KryoException: >>> java.lang.UnsupportedOperationException >>> Serialization trace: >>> enrichments_ (com.company$MyObject) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313) >>> ... 11 more >>> Caused by: java.lang.UnsupportedOperationException >>> at >>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>> ... 15 more >>> >>> >>> I believed that protobuf are now serializable on default Flink >>> configuration after fixing this issue in 0.9/0.8.1: >>> https://issues.apache.org/jira/browse/FLINK-1392 >>> >>> Maybe it really is, but Flink just requires some configuration? >>> I'll be grateful for your help with this issue. >>> Cheers, >>> Krzysztof >>> >>> >> >