[
https://issues.apache.org/jira/browse/SPARK-34731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354670#comment-17354670
]
John Pugliesi commented on SPARK-34731:
---
To clarify, does this issue potentially prevent event logs from being
created/written entirely? We're seeing this exception in some of our Spark
3.1.1 applications - namely the applications with particularly large Window
queries - where the final event log is never successfully written out (using an
s3a:// spark.eventLog.dir, for what it's worth):
{code:bash}
# spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir s3a://my-bucket/spark-event-logs/
{code}
> ConcurrentModificationException in EventLoggingListener when redacting
> properties
> -
>
> Key: SPARK-34731
> URL: https://issues.apache.org/jira/browse/SPARK-34731
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Bruce Robbins
>Assignee: Bruce Robbins
>Priority: Major
> Fix For: 3.1.2, 3.2.0
>
>
> Reproduction:
> The key elements of reproduction are enabling event logging, settingĀ
> spark.executor.cores, and some bad luck:
> {noformat}
> $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
> --conf spark.executor.cores=1 --driver-memory 4g --conf \
> "spark.ui.showConsoleProgress=false" \
> --conf spark.eventLog.enabled=true \
> --conf spark.eventLog.dir=/tmp/spark-events
> ...
> scala> (0 to 500).foreach { i =>
> | val df = spark.range(0, 2).toDF("a")
> | df.filter("a > 12").count
> | }
> 21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw
> an exception
> java.util.ConcurrentModificationException
> at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
> at
> scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
> at
> scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
> at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
> at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
> at
> org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
> at
> org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
> at
> org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
> at
> org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
> at
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
> at
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
> at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
> at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
> at
> org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
> at
> org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
> at
> scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
> at
> org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
> at
> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379)
> at
> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
> {noformat}
> Analysis from quick reading of the code:
> DAGScheduler posts a JobSubmitted event containing a clone of a properties
> object
> [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L834].
> This event is handled
> [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2394].
> DAGScheduler#handleJobSubmitted stores the properties object in a [Job
>