Hi Yukun,

I debugged this issue and found that this is a bug in the serialization of
the StateDescriptor.
I have created FLINK-4640 [1] to resolve the issue.

Thanks for reporting the issue.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-4640

2016-09-20 10:35 GMT+02:00 Yukun Guo <gyk....@gmail.com>:

> Some detail: if running the FoldFunction on a KeyedStream, everything
> works fine. So it must relate to the way WindowedStream handles type
> extraction.
>
> In case any Flink experts would like to reproduce it, I have created a
> repo on Github: github.com/gyk/flink-multimap
>
> On 20 September 2016 at 10:33, Yukun Guo <gyk....@gmail.com> wrote:
>
>> Hi,
>>
>> The same error occurs after changing the code, unfortunately.
>>
>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
>> serializer` where T extends Serializer<?> & Serializable, so I pass a
>> custom GenericJavaSerializer<T>, but I guess this doesn't matter much.
>>
>>
>> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> Can you use 
>>> "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, 
>>> JavaSerializer.class)"
>>> ?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote:
>>>
>>>> Here is the code snippet:
>>>>
>>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new 
>>>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>>     @Override
>>>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> 
>>>> topKSoFar,
>>>>                                            Tuple2<String, Long> itemCount) 
>>>> throws Exception {
>>>>         String item = itemCount.f0;
>>>>         Long count = itemCount.f1;
>>>>         topKSoFar.put(count, item);
>>>>         if (topKSoFar.keySet().size() > topK) {
>>>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>>>         }
>>>>         return topKSoFar;
>>>>     }
>>>> });
>>>>
>>>>
>>>> The problem is when fold function getting called, the initial value has
>>>> lost therefore it encounters a NullPointerException. This is due to failed
>>>> type extraction and serialization, as shown in the log message:
>>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>>> Will be handled as GenericType."
>>>>
>>>> I have tried the following two ways to fix it but neither of them
>>>> worked:
>>>>
>>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>>> Serializer<T>, and calling 
>>>> `env.addDefaultKryoSerializer(TreeMultimap.class,
>>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>>> line-by-line translations from TreeMultimap's own implementation.
>>>>
>>>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
>>>> back to use the standard Java serialization. Since Kryo's JavaSerializer
>>>> itself is not serializable, I wrote an adapter to make it fit the
>>>> "addDefaultKryoSerializer" API.
>>>>
>>>> Could you please give me some working examples for custom Kryo
>>>> serialization in Flink?
>>>>
>>>>
>>>> Best regards,
>>>> Yukun
>>>>
>>>>
>>>
>>
>

Reply via email to