Here is the code snippet: windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { @Override public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar, Tuple2<String, Long> itemCount) throws Exception { String item = itemCount.f0; Long count = itemCount.f1; topKSoFar.put(count, item); if (topKSoFar.keySet().size() > topK) { topKSoFar.removeAll(topKSoFar.keySet().first()); } return topKSoFar; } });
The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message: "INFO TypeExtractor:1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType." I have tried the following two ways to fix it but neither of them worked: 1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation. 2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API. Could you please give me some working examples for custom Kryo serialization in Flink? Best regards, Yukun