[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-18 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585534#comment-15585534
 ] 

Maximilian Michels commented on FLINK-4829:
---

To mitigate the problem, we have best-effort reporting for accumulators now.

master: 783dca56eedc95f0a8974a9b50f2b532ca8cf849, 
d95929e0110b53f03452e1ad453de2522f79a6b8
release-1.1: c1d6b24600e40700fa06caa28bc81788d8e92386, 
210230c4ab44b84c28b9a62ff461de0955e67f8f

> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585520#comment-15585520
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2649


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582243#comment-15582243
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2649
  
LGTM
+1 to merge this


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582141#comment-15582141
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2634
  
Closed in favor of #2649.


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582140#comment-15582140
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2649

[FLINK-4829] snapshot accumulators on a best-effort basis

Heartbeats should not fail when accumulators could not be snapshotted. 
Instead, we should simply skip the reporting of the failed accumulator. 
Eventually, the accumulator will be reported; at the latest, when the job 
finishes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4829

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2649


commit 2a424a6120b7f1be8f357522a3fe964e77bd4cce
Author: Maximilian Michels 
Date:   2016-10-14T13:15:50Z

[FLINK-4829] protect user accumulators against concurrent updates

commit f162142b1d274895e16712e42f0f32a43e187db9
Author: Maximilian Michels 
Date:   2016-10-17T12:19:00Z

[FLINK-4829] snapshot accumulators on a best-effort basis

Heartbeats should not fail when accumulators could not be snapshotted. 
Instead,
we should simply skip the reporting of the failed accumulator. Eventually, 
the
accumulator will be reported; at the latest, when the job finishes.




> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> 

[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582133#comment-15582133
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/2634


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-17 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15581850#comment-15581850
 ] 

Maximilian Michels commented on FLINK-4829:
---

Locking on a per-element basis in the Accumulator interface seems to be rather 
costly. I'd propose to report the accumulators on a best-effort basis and 
refrain from any concurrency locks for now.

> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-14 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575439#comment-15575439
 ] 

Maximilian Michels commented on FLINK-4829:
---

To fix the problem, we will have to introduce a lock in the Accumulator base 
class which has to be acquired upon adding values. That seems to be the only 
way to make this work for custom accumulators.

Note that this concerns only live accumulators. Accumulators will still be 
reported when the job is over. 

> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
> Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575423#comment-15575423
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2634
  
That's correct. I guess it still makes sense to merge this PR.


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
> Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575382#comment-15575382
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2634
  
I don't think fixes the reported issue. The supplied stacktrace shows that 
the exception is thrown in a TreeMap, in the AccRegistry we however only deal 
with HashMaps. The histograms internally have a TreeMap however.


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
> Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575329#comment-15575329
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2634

[FLINK-4829] protect user accumulators against concurrent updates



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4829

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2634


commit 922e765f7a8f6870b79c6abb65d6ea681901e80a
Author: Maximilian Michels 
Date:   2016-10-14T13:15:50Z

[FLINK-4829] protect user accumulators against concurrent updates




> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
> Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-14 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574886#comment-15574886
 ] 

Maximilian Michels commented on FLINK-4829:
---

Actually, they used to be a synchronized map when I did the initial 
implementation. That prevented concurrent modifications. In FLINK-3880 we 
removed the lock but forgot that the heartbeat could actually concurrently 
access the map.

> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)