Hi Yukun, I debugged this issue and found that this is a bug in the serialization of the StateDescriptor. I have created FLINK-4640 [1] to resolve the issue.
Thanks for reporting the issue. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-4640 2016-09-20 10:35 GMT+02:00 Yukun Guo <gyk....@gmail.com>: > Some detail: if running the FoldFunction on a KeyedStream, everything > works fine. So it must relate to the way WindowedStream handles type > extraction. > > In case any Flink experts would like to reproduce it, I have created a > repo on Github: github.com/gyk/flink-multimap > > On 20 September 2016 at 10:33, Yukun Guo <gyk....@gmail.com> wrote: > >> Hi, >> >> The same error occurs after changing the code, unfortunately. >> >> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T >> serializer` where T extends Serializer<?> & Serializable, so I pass a >> custom GenericJavaSerializer<T>, but I guess this doesn't matter much. >> >> >> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi! >>> >>> Can you use >>> "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, >>> JavaSerializer.class)" >>> ? >>> >>> Best, >>> Stephan >>> >>> >>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote: >>> >>>> Here is the code snippet: >>>> >>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new >>>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { >>>> @Override >>>> public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> >>>> topKSoFar, >>>> Tuple2<String, Long> itemCount) >>>> throws Exception { >>>> String item = itemCount.f0; >>>> Long count = itemCount.f1; >>>> topKSoFar.put(count, item); >>>> if (topKSoFar.keySet().size() > topK) { >>>> topKSoFar.removeAll(topKSoFar.keySet().first()); >>>> } >>>> return topKSoFar; >>>> } >>>> }); >>>> >>>> >>>> The problem is when fold function getting called, the initial value has >>>> lost therefore it encounters a NullPointerException. This is due to failed >>>> type extraction and serialization, as shown in the log message: >>>> "INFO TypeExtractor:1685 - No fields detected for class >>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. >>>> Will be handled as GenericType." >>>> >>>> I have tried the following two ways to fix it but neither of them >>>> worked: >>>> >>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's >>>> Serializer<T>, and calling >>>> `env.addDefaultKryoSerializer(TreeMultimap.class, >>>> new TreeMultimapSerializer()`. The write/read methods are almost >>>> line-by-line translations from TreeMultimap's own implementation. >>>> >>>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall >>>> back to use the standard Java serialization. Since Kryo's JavaSerializer >>>> itself is not serializable, I wrote an adapter to make it fit the >>>> "addDefaultKryoSerializer" API. >>>> >>>> Could you please give me some working examples for custom Kryo >>>> serialization in Flink? >>>> >>>> >>>> Best regards, >>>> Yukun >>>> >>>> >>> >> >