Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new
FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
    @Override
    public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
                                           Tuple2<String, Long>
itemCount) throws Exception {
        String item = itemCount.f0;
        Long count = itemCount.f1;
        topKSoFar.put(count, item);
        if (topKSoFar.keySet().size() > topK) {
            topKSoFar.removeAll(topKSoFar.keySet().first());
        }
        return topKSoFar;
    }
});


The problem is when fold function getting called, the initial value has
lost therefore it encounters a NullPointerException. This is due to failed
type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:1685 - No fields detected for class
com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will
be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's
Serializer<T>, and calling
`env.addDefaultKryoSerializer(TreeMultimap.class, new
TreeMultimapSerializer()`. The write/read methods are almost line-by-line
translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall
back to use the standard Java serialization. Since Kryo's JavaSerializer
itself is not serializable, I wrote an adapter to make it fit the
"addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo
serialization in Flink?


Best regards,
Yukun

Reply via email to