Hi all,
In my Beam job I have defined my own CombineFn with an accumulator. Running
locally is no problem, but when I run the job on Dataflow I hit an Avro
serialization exception:
java.lang.NoSuchMethodException: java.util.Map.<init>()
java.lang.Class.getConstructor0(Class.java:3082)
java.lang.Class.getDeclaredConstructor(Class.java:2178)
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
accumulator class. Is there anything special I need to do because one of
the fields in my accumulator class is a Map? I have pasted an outline of my
CombineFn below.
Thanks for any help with this!
Josh
private static class MyCombineFn extends CombineFn<Event,
MyCombineFn.Accum, Out> {
private static class ExpiringLinkedHashMap<K, V> extends
LinkedHashMap<K, V> {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return this.size() > 10;
}
}
@DefaultCoder(AvroCoder.class)
private static class PartialEventUpdate implements Serializable {
Long incrementCountBy = 0L;
Map<String, Event> recentEvents = new ExpiringLinkedHashMap<>();
Long lastSeenMillis = 0L;
PartialEventUpdate() {}
}
@DefaultCoder(AvroCoder.class)
private static class Accum implements Serializable {
Map<UUID, PartialEventUpdate> eventIdToUpdate = new HashMap<>();
Accum() {}
}
@Override
public MyCombineFn.Accum createAccumulator() {
return new MyCombineFn.Accum();
}
...
}