[jira] [Commented] (SPARK-42290) Spark Driver hangs on OOM during Broadcast when AQE is enabled

2023-06-11 Thread Jia Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731331#comment-17731331
 ] 

Jia Fan commented on SPARK-42290:
-

Thanks [~dongjoon] 

> Spark Driver hangs on OOM during Broadcast when AQE is enabled 
> ---
>
> Key: SPARK-42290
> URL: https://issues.apache.org/jira/browse/SPARK-42290
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Shardul Mahadik
>Assignee: Jia Fan
>Priority: Critical
> Fix For: 3.4.1, 3.5.0
>
>
> Repro steps:
> {code}
> $ spark-shell --conf spark.driver.memory=1g
> val df = spark.range(500).withColumn("str", 
> lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf"))
> val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer")
> df2.collect
> {code}
> This will cause the driver to hang indefinitely. Heres a thread dump of the 
> {{main}} thread when its stuck
> {code}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown
>  Source)
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
>  => holding Monitor(java.lang.Object@1932537396})
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
> org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179)
> org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420)
> org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source)
> org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169)
> org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source)
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167)
> org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown
>  Source)
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167)
> org.apache.spark.sql.Dataset.collect(Dataset.scala:3420)
> {code}
> When we disable AQE though we get the following exception instead of driver 
> hang.
> {code}
> Caused by: org.apache.spark.SparkException: Not enough memory to build and 
> broadcast the table to all worker nodes. As a workaround, you can either 
> disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or 
> increase the spark driver memory by setting spark.driver.memory to a higher 
> value.
>   ... 7 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834)
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777)
>   at 
> org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCap

[jira] [Commented] (SPARK-42290) Spark Driver hangs on OOM during Broadcast when AQE is enabled

2023-06-10 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731180#comment-17731180
 ] 

Dongjoon Hyun commented on SPARK-42290:
---

Oh, sorry, [~fanjia]. I fixed it now.

> Spark Driver hangs on OOM during Broadcast when AQE is enabled 
> ---
>
> Key: SPARK-42290
> URL: https://issues.apache.org/jira/browse/SPARK-42290
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Shardul Mahadik
>Assignee: Jia Fan
>Priority: Critical
> Fix For: 3.4.1, 3.5.0
>
>
> Repro steps:
> {code}
> $ spark-shell --conf spark.driver.memory=1g
> val df = spark.range(500).withColumn("str", 
> lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf"))
> val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer")
> df2.collect
> {code}
> This will cause the driver to hang indefinitely. Heres a thread dump of the 
> {{main}} thread when its stuck
> {code}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown
>  Source)
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
>  => holding Monitor(java.lang.Object@1932537396})
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
> org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179)
> org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420)
> org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source)
> org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169)
> org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source)
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167)
> org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown
>  Source)
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167)
> org.apache.spark.sql.Dataset.collect(Dataset.scala:3420)
> {code}
> When we disable AQE though we get the following exception instead of driver 
> hang.
> {code}
> Caused by: org.apache.spark.SparkException: Not enough memory to build and 
> broadcast the table to all worker nodes. As a workaround, you can either 
> disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or 
> increase the spark driver memory by setting spark.driver.memory to a higher 
> value.
>   ... 7 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834)
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777)
>   at 
> org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.SQLExecutio

[jira] [Commented] (SPARK-42290) Spark Driver hangs on OOM during Broadcast when AQE is enabled

2023-06-08 Thread Jia Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730775#comment-17730775
 ] 

Jia Fan commented on SPARK-42290:
-

[~dongjoon] Seem the assigning people not right.

> Spark Driver hangs on OOM during Broadcast when AQE is enabled 
> ---
>
> Key: SPARK-42290
> URL: https://issues.apache.org/jira/browse/SPARK-42290
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Shardul Mahadik
>Assignee: Shardul Mahadik
>Priority: Critical
> Fix For: 3.4.1, 3.5.0
>
>
> Repro steps:
> {code}
> $ spark-shell --conf spark.driver.memory=1g
> val df = spark.range(500).withColumn("str", 
> lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf"))
> val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer")
> df2.collect
> {code}
> This will cause the driver to hang indefinitely. Heres a thread dump of the 
> {{main}} thread when its stuck
> {code}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown
>  Source)
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
>  => holding Monitor(java.lang.Object@1932537396})
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
> org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179)
> org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420)
> org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source)
> org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169)
> org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source)
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167)
> org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown
>  Source)
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167)
> org.apache.spark.sql.Dataset.collect(Dataset.scala:3420)
> {code}
> When we disable AQE though we get the following exception instead of driver 
> hang.
> {code}
> Caused by: org.apache.spark.SparkException: Not enough memory to build and 
> broadcast the table to all worker nodes. As a workaround, you can either 
> disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or 
> increase the spark driver memory by setting spark.driver.memory to a higher 
> value.
>   ... 7 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834)
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777)
>   at 
> org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163)
>   at 
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.SQLE