Some detail: if running the FoldFunction on a KeyedStream, everything works
fine. So it must relate to the way WindowedStream handles type extraction.

In case any Flink experts would like to reproduce it, I have created a repo
on Github: github.com/gyk/flink-multimap

On 20 September 2016 at 10:33, Yukun Guo <gyk....@gmail.com> wrote:

> Hi,
>
> The same error occurs after changing the code, unfortunately.
>
> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
> serializer` where T extends Serializer<?> & Serializable, so I pass a
> custom GenericJavaSerializer<T>, but I guess this doesn't matter much.
>
>
> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Can you use "env.getConfig().registerTypeWithKryoSerializer(
>> TreeMultimap.class, JavaSerializer.class)" ?
>>
>> Best,
>> Stephan
>>
>>
>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote:
>>
>>> Here is the code snippet:
>>>
>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new 
>>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>     @Override
>>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> 
>>> topKSoFar,
>>>                                            Tuple2<String, Long> itemCount) 
>>> throws Exception {
>>>         String item = itemCount.f0;
>>>         Long count = itemCount.f1;
>>>         topKSoFar.put(count, item);
>>>         if (topKSoFar.keySet().size() > topK) {
>>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>>         }
>>>         return topKSoFar;
>>>     }
>>> });
>>>
>>>
>>> The problem is when fold function getting called, the initial value has
>>> lost therefore it encounters a NullPointerException. This is due to failed
>>> type extraction and serialization, as shown in the log message:
>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>> Will be handled as GenericType."
>>>
>>> I have tried the following two ways to fix it but neither of them worked:
>>>
>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>> line-by-line translations from TreeMultimap's own implementation.
>>>
>>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
>>> back to use the standard Java serialization. Since Kryo's JavaSerializer
>>> itself is not serializable, I wrote an adapter to make it fit the
>>> "addDefaultKryoSerializer" API.
>>>
>>> Could you please give me some working examples for custom Kryo
>>> serialization in Flink?
>>>
>>>
>>> Best regards,
>>> Yukun
>>>
>>>
>>
>

Reply via email to