[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585534#comment-15585534 ] Maximilian Michels commented on FLINK-4829: --- To mitigate the problem, we have best-effort reporting for accumulators now. master: 783dca56eedc95f0a8974a9b50f2b532ca8cf849, d95929e0110b53f03452e1ad453de2522f79a6b8 release-1.1: c1d6b24600e40700fa06caa28bc81788d8e92386, 210230c4ab44b84c28b9a62ff461de0955e67f8f > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585520#comment-15585520 ] ASF GitHub Bot commented on FLINK-4829: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2649 > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582243#comment-15582243 ] ASF GitHub Bot commented on FLINK-4829: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2649 LGTM +1 to merge this > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582141#comment-15582141 ] ASF GitHub Bot commented on FLINK-4829: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2634 Closed in favor of #2649. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582140#comment-15582140 ] ASF GitHub Bot commented on FLINK-4829: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2649 [FLINK-4829] snapshot accumulators on a best-effort basis Heartbeats should not fail when accumulators could not be snapshotted. Instead, we should simply skip the reporting of the failed accumulator. Eventually, the accumulator will be reported; at the latest, when the job finishes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4829 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2649.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2649 commit 2a424a6120b7f1be8f357522a3fe964e77bd4cce Author: Maximilian Michels Date: 2016-10-14T13:15:50Z [FLINK-4829] protect user accumulators against concurrent updates commit f162142b1d274895e16712e42f0f32a43e187db9 Author: Maximilian Michels Date: 2016-10-17T12:19:00Z [FLINK-4829] snapshot accumulators on a best-effort basis Heartbeats should not fail when accumulators could not be snapshotted. Instead, we should simply skip the reporting of the failed accumulator. Eventually, the accumulator will be reported; at the latest, when the job finishes. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accu
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582133#comment-15582133 ] ASF GitHub Bot commented on FLINK-4829: --- Github user mxm closed the pull request at: https://github.com/apache/flink/pull/2634 > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15581850#comment-15581850 ] Maximilian Michels commented on FLINK-4829: --- Locking on a per-element basis in the Accumulator interface seems to be rather costly. I'd propose to report the accumulators on a best-effort basis and refrain from any concurrency locks for now. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575439#comment-15575439 ] Maximilian Michels commented on FLINK-4829: --- To fix the problem, we will have to introduce a lock in the Accumulator base class which has to be acquired upon adding values. That seems to be the only way to make this work for custom accumulators. Note that this concerns only live accumulators. Accumulators will still be reported when the job is over. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels > Fix For: 1.2.0 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575423#comment-15575423 ] ASF GitHub Bot commented on FLINK-4829: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2634 That's correct. I guess it still makes sense to merge this PR. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels > Fix For: 1.2.0 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575382#comment-15575382 ] ASF GitHub Bot commented on FLINK-4829: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2634 I don't think fixes the reported issue. The supplied stacktrace shows that the exception is thrown in a TreeMap, in the AccRegistry we however only deal with HashMaps. The histograms internally have a TreeMap however. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels > Fix For: 1.2.0 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575329#comment-15575329 ] ASF GitHub Bot commented on FLINK-4829: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2634 [FLINK-4829] protect user accumulators against concurrent updates You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4829 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2634.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2634 commit 922e765f7a8f6870b79c6abb65d6ea681901e80a Author: Maximilian Michels Date: 2016-10-14T13:15:50Z [FLINK-4829] protect user accumulators against concurrent updates > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels > Fix For: 1.2.0 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15574886#comment-15574886 ] Maximilian Michels commented on FLINK-4829: --- Actually, they used to be a synchronized map when I did the initial implementation. That prevented concurrent modifications. In FLINK-3880 we removed the lock but forgot that the heartbeat could actually concurrently access the map. > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)