This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Mohit,
>
> As 刘彪 pointed out in his reply, the problem is that your
> `Tuple2Serializer` contains fields that are not serializable, so
> `Tuple2Serializer` itself is not serializable.
> Could you perhaps share your `Tuple2Serializer` implementation with us so
> we can pinpoint the problem?
>
> A snippet of the class fields and constructor will do, so you don’t have
> to provide the whole `serialize` / `deserialize` implementation if you
> don’t want to.
>
> Cheers,
> Gordon
>
>
> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com)
> wrote:
>
> I am using String inside to convert into bytes.
>
> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
>
>> Hi Mohit
>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>
>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>
>>> I wrote a key serialization class to write to kafka however I am getting
>>> this error. Not sure why as I've already implemented the interfaces.
>>>
>>> Caused by: java.io.NotSerializableException:
>>> com.sy.flink.test.Tuple2Serializerr$1
>>>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>> ava:1184)
>>>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>> ream.java:1548)
>>>
>>> And the class implements the following:
>>>
>>> *public* *class* *Tuple2Serializerr* *implements*
>>>
>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>
>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>
>>> And called like this:
>>>
>>>
>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>
>>> "10.22.4.15:9092", // broker list
>>>
>>> "my-topic", // target topic
>>>
>>> *new* Tuple2Serializerr()); // serialization schema
>>>
>>>
>>>
>>>
>>
>

Reply via email to