Thank you for quickly fixing it!
On 20 September 2016 at 17:17, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Yukun, > > I debugged this issue and found that this is a bug in the serialization of > the StateDescriptor. > I have created FLINK-4640 [1] to resolve the issue. > > Thanks for reporting the issue. > > Best, Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-4640 > > 2016-09-20 10:35 GMT+02:00 Yukun Guo <gyk....@gmail.com>: > >> Some detail: if running the FoldFunction on a KeyedStream, everything >> works fine. So it must relate to the way WindowedStream handles type >> extraction. >> >> In case any Flink experts would like to reproduce it, I have created a >> repo on Github: github.com/gyk/flink-multimap >> >> On 20 September 2016 at 10:33, Yukun Guo <gyk....@gmail.com> wrote: >> >>> Hi, >>> >>> The same error occurs after changing the code, unfortunately. >>> >>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T >>> serializer` where T extends Serializer<?> & Serializable, so I pass a >>> custom GenericJavaSerializer<T>, but I guess this doesn't matter much. >>> >>> >>> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hi! >>>> >>>> Can you use "env.getConfig().registerTypeW >>>> ithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ? >>>> >>>> Best, >>>> Stephan >>>> >>>> >>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote: >>>> >>>>> Here is the code snippet: >>>>> >>>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new >>>>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { >>>>> @Override >>>>> public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> >>>>> topKSoFar, >>>>> Tuple2<String, Long> >>>>> itemCount) throws Exception { >>>>> String item = itemCount.f0; >>>>> Long count = itemCount.f1; >>>>> topKSoFar.put(count, item); >>>>> if (topKSoFar.keySet().size() > topK) { >>>>> topKSoFar.removeAll(topKSoFar.keySet().first()); >>>>> } >>>>> return topKSoFar; >>>>> } >>>>> }); >>>>> >>>>> >>>>> The problem is when fold function getting called, the initial value >>>>> has lost therefore it encounters a NullPointerException. This is due to >>>>> failed type extraction and serialization, as shown in the log message: >>>>> "INFO TypeExtractor:1685 - No fields detected for class >>>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. >>>>> Will be handled as GenericType." >>>>> >>>>> I have tried the following two ways to fix it but neither of them >>>>> worked: >>>>> >>>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's >>>>> Serializer<T>, and calling >>>>> `env.addDefaultKryoSerializer(TreeMultimap.class, >>>>> new TreeMultimapSerializer()`. The write/read methods are almost >>>>> line-by-line translations from TreeMultimap's own implementation. >>>>> >>>>> 2. TreeMultimap has implemented Serializable interface so Kryo can >>>>> fall back to use the standard Java serialization. Since Kryo's >>>>> JavaSerializer itself is not serializable, I wrote an adapter to make it >>>>> fit the "addDefaultKryoSerializer" API. >>>>> >>>>> Could you please give me some working examples for custom Kryo >>>>> serialization in Flink? >>>>> >>>>> >>>>> Best regards, >>>>> Yukun >>>>> >>>>> >>>> >>> >> >