Some detail: if running the FoldFunction on a KeyedStream, everything works fine. So it must relate to the way WindowedStream handles type extraction.
In case any Flink experts would like to reproduce it, I have created a repo on Github: github.com/gyk/flink-multimap On 20 September 2016 at 10:33, Yukun Guo <gyk....@gmail.com> wrote: > Hi, > > The same error occurs after changing the code, unfortunately. > > BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T > serializer` where T extends Serializer<?> & Serializable, so I pass a > custom GenericJavaSerializer<T>, but I guess this doesn't matter much. > > > On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote: > >> Hi! >> >> Can you use "env.getConfig().registerTypeWithKryoSerializer( >> TreeMultimap.class, JavaSerializer.class)" ? >> >> Best, >> Stephan >> >> >> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote: >> >>> Here is the code snippet: >>> >>> windowedStream.fold(TreeMultimap.<Long, String>create(), new >>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { >>> @Override >>> public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> >>> topKSoFar, >>> Tuple2<String, Long> itemCount) >>> throws Exception { >>> String item = itemCount.f0; >>> Long count = itemCount.f1; >>> topKSoFar.put(count, item); >>> if (topKSoFar.keySet().size() > topK) { >>> topKSoFar.removeAll(topKSoFar.keySet().first()); >>> } >>> return topKSoFar; >>> } >>> }); >>> >>> >>> The problem is when fold function getting called, the initial value has >>> lost therefore it encounters a NullPointerException. This is due to failed >>> type extraction and serialization, as shown in the log message: >>> "INFO TypeExtractor:1685 - No fields detected for class >>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. >>> Will be handled as GenericType." >>> >>> I have tried the following two ways to fix it but neither of them worked: >>> >>> 1. Writing a class TreeMultimapSerializer which extends Kryo's >>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, >>> new TreeMultimapSerializer()`. The write/read methods are almost >>> line-by-line translations from TreeMultimap's own implementation. >>> >>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall >>> back to use the standard Java serialization. Since Kryo's JavaSerializer >>> itself is not serializable, I wrote an adapter to make it fit the >>> "addDefaultKryoSerializer" API. >>> >>> Could you please give me some working examples for custom Kryo >>> serialization in Flink? >>> >>> >>> Best regards, >>> Yukun >>> >>> >> >