This happens when the TaskManager is serializing an
org.apache.flink.api.common.accumulators.Histogram by iterating through the
underlying TreeMap while a MapFunction for updating the accumulator
attempts to modify the TreeMap concurrently. How could I fix it?


The call stack:

WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     -
Failed to serialize accumulators for task.
java.util.ConcurrentModificationException
        at
java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
        at java.util.TreeMap.writeObject(TreeMap.java:2436)
        at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        at java.util.HashMap.writeObject(HashMap.java:1362)
        at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
        at
org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
        at
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
        ...

Reply via email to