[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2020-06-17 Thread Sean R. Owen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138879#comment-17138879
 ] 

Sean R. Owen commented on SPARK-26183:
--

Yeah that's probably imprecise. From the usage, it's not clear whether MyClass 
is being used on the driver or executor. Given it accepts a SparkContext, I'd 
say driver. But then why are things being accumulated on the driver? I'm 
guessing it's something like, this is a class instantiated on the driver and is 
kind of being misused to accumulate on the driver, but also executor. If some 
process is doing both at the same time, you could get this error.

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> 

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2020-06-17 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138810#comment-17138810
 ] 

Brandon commented on SPARK-26183:
-

Hi [~srowen], I have also seen this issue with the same stack trace occur in 
Spark 2.4.4 while using CollectionAccumulator. I didn't fully understand what 
you mean by this:

"It arises because you are (inadvertently) serializing the accumulator because 
it's a field in your app. It's not meant to be anywhere but the driver."

I was thinking the accumulator must be serialized inside a task in order to be 
used by executors to add to it. Can you elaborate how to avoid inadvertently 
serializing the accumulator? An example would be very helpful. Thanks.

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> 

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-29 Thread Thincrs (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703667#comment-16703667
 ] 

Thincrs commented on SPARK-26183:
-

testing thincrs

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-29 Thread Jairo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703596#comment-16703596
 ] 

Jairo commented on SPARK-26183:
---

testing thincrs

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-29 Thread Jairo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703593#comment-16703593
 ] 

Jairo commented on SPARK-26183:
---

..///

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
>

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-29 Thread Jairo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703534#comment-16703534
 ] 

Jairo commented on SPARK-26183:
---

.../

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> 

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-29 Thread Jairo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703444#comment-16703444
 ] 

Jairo commented on SPARK-26183:
---

.

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
>

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-27 Thread Jairo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700702#comment-16700702
 ] 

Jairo commented on SPARK-26183:
---

.

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
>

[jira] [Commented] (SPARK-26183) ConcurrentModificationException when using Spark collectionAccumulator

2018-11-27 Thread Jairo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700700#comment-16700700
 ] 

Jairo commented on SPARK-26183:
---

.

> ConcurrentModificationException when using Spark collectionAccumulator
> --
>
> Key: SPARK-26183
> URL: https://issues.apache.org/jira/browse/SPARK-26183
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Rob Dawson
>Priority: Major
>
> I'm trying to run a Spark-based application on an Azure HDInsight on-demand 
> cluster, and am seeing lots of SparkExceptions (caused by 
> ConcurrentModificationExceptions) being logged. The application runs without 
> these errors when I start a local Spark instance.
> I've seen reports of [similar errors when using 
> accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code 
> is indeed using a CollectionAccumulator, however I have placed synchronized 
> blocks everywhere I use it, and it makes no difference. The accumulator code 
> looks like this:
> {code:scala}
> class MySparkClass(sc : SparkContext) {
> val myAccumulator = sc.collectionAccumulator[MyRecord]
> override def add(record: MyRecord) = {
> synchronized {
> myAccumulator.add(record)
> }
> }
> override def endOfBatch() = {
> synchronized {
> myAccumulator.value.asScala.foreach((record: MyRecord) => {
> processIt(record)
> })
> }
> }
> }{code}
> The exceptions don't cause the application to fail, however when the code 
> tries to read values out of the accumulator it is empty and {{processIt}} is 
> never called.
> {code}
> 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
> at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
> at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:770)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
>