There was a private member variable that was not serializable and was not
marked transient. Thanks for the pointer.

On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Thanks for clarifying.
>
> From the looks of your exception:
>
> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>>         at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>>         at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>
> com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous
> inner class in `Tuple2Serializerr` is not serializable.
>
> Could you check if that’s the case?
>
>
>
> On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com)
> wrote:
>
> But it is not an inner class.
>
> On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> > wrote:
>
>> Since I don’t have your complete code, I’m guessing this is the problem:
>> Is your `Tuple2Serializer` an inner class? If yes, you should be able to
>> solve the problem by declaring `Tuple2Serializer` to be `static`.
>>
>> This is more of a Java problem -
>> It isn’t serializable if it isn’t static, because it will contain an
>> implicit reference to the enclosing outer class, and therefore serializing
>> it will result in serializing the outer class instance as well.
>>
>>
>> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com)
>> wrote:
>>
>> This is at high level what I am doing:
>>
>> Serialize:
>>
>> String s = tuple.getPos(0) + "," + tuple.getPos(1);
>> return s.getBytes()
>>
>> Deserialize:
>>
>> String s = new String(message);
>> String [] sarr = s.split(",");
>> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
>> Integer.valueOf(sarr[1]));
>>
>> return tuple;
>>
>>
>> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org> wrote:
>>
>>> Hi Mohit,
>>>
>>> As 刘彪 pointed out in his reply, the problem is that your
>>> `Tuple2Serializer` contains fields that are not serializable, so
>>> `Tuple2Serializer` itself is not serializable.
>>> Could you perhaps share your `Tuple2Serializer` implementation with us
>>> so we can pinpoint the problem?
>>>
>>> A snippet of the class fields and constructor will do, so you don’t have
>>> to provide the whole `serialize` / `deserialize` implementation if you
>>> don’t want to.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (
>>> mohitanch...@gmail.com) wrote:
>>>
>>> I am using String inside to convert into bytes.
>>>
>>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
>>>
>>>> Hi Mohit
>>>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>>>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>>>
>>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>>>
>>>>> I wrote a key serialization class to write to kafka however I am
>>>>> getting this error. Not sure why as I've already implemented the 
>>>>> interfaces.
>>>>>
>>>>> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>>         at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>>         at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>> And the class implements the following:
>>>>>
>>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>>
>>>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>>>
>>>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>>>
>>>>> And called like this:
>>>>>
>>>>>
>>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>>>
>>>>> "10.22.4.15:9092", // broker list
>>>>>
>>>>> "my-topic", // target topic
>>>>>
>>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to