Hi Mohit, As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable. Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?
A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to. Cheers, Gordon On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) wrote: I am using String inside to convert into bytes. On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: Hi Mohit As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable. 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces. Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) And the class implements the following: public class Tuple2Serializerr implements DeserializationSchema<Tuple2<Integer, Integer>>, SerializationSchema<Tuple2<Integer, Integer>> { And called like this: FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>( "10.22.4.15:9092", // broker list "my-topic", // target topic new Tuple2Serializerr()); // serialization schema