Thank you for quickly fixing it!

On 20 September 2016 at 17:17, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yukun,
>
> I debugged this issue and found that this is a bug in the serialization of
> the StateDescriptor.
> I have created FLINK-4640 [1] to resolve the issue.
>
> Thanks for reporting the issue.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-4640
>
> 2016-09-20 10:35 GMT+02:00 Yukun Guo <gyk....@gmail.com>:
>
>> Some detail: if running the FoldFunction on a KeyedStream, everything
>> works fine. So it must relate to the way WindowedStream handles type
>> extraction.
>>
>> In case any Flink experts would like to reproduce it, I have created a
>> repo on Github: github.com/gyk/flink-multimap
>>
>> On 20 September 2016 at 10:33, Yukun Guo <gyk....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The same error occurs after changing the code, unfortunately.
>>>
>>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
>>> serializer` where T extends Serializer<?> & Serializable, so I pass a
>>> custom GenericJavaSerializer<T>, but I guess this doesn't matter much.
>>>
>>>
>>> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Can you use "env.getConfig().registerTypeW
>>>> ithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote:
>>>>
>>>>> Here is the code snippet:
>>>>>
>>>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new 
>>>>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>>>     @Override
>>>>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> 
>>>>> topKSoFar,
>>>>>                                            Tuple2<String, Long> 
>>>>> itemCount) throws Exception {
>>>>>         String item = itemCount.f0;
>>>>>         Long count = itemCount.f1;
>>>>>         topKSoFar.put(count, item);
>>>>>         if (topKSoFar.keySet().size() > topK) {
>>>>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>>>>         }
>>>>>         return topKSoFar;
>>>>>     }
>>>>> });
>>>>>
>>>>>
>>>>> The problem is when fold function getting called, the initial value
>>>>> has lost therefore it encounters a NullPointerException. This is due to
>>>>> failed type extraction and serialization, as shown in the log message:
>>>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>>>> Will be handled as GenericType."
>>>>>
>>>>> I have tried the following two ways to fix it but neither of them
>>>>> worked:
>>>>>
>>>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>>>> Serializer<T>, and calling 
>>>>> `env.addDefaultKryoSerializer(TreeMultimap.class,
>>>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>>>> line-by-line translations from TreeMultimap's own implementation.
>>>>>
>>>>> 2. TreeMultimap has implemented Serializable interface so Kryo can
>>>>> fall back to use the standard Java serialization. Since Kryo's
>>>>> JavaSerializer itself is not serializable, I wrote an adapter to make it
>>>>> fit the "addDefaultKryoSerializer" API.
>>>>>
>>>>> Could you please give me some working examples for custom Kryo
>>>>> serialization in Flink?
>>>>>
>>>>>
>>>>> Best regards,
>>>>> Yukun
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to