[ 
https://issues.apache.org/jira/browse/SPARK-44897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-44897.
---------------------------------
    Fix Version/s: 3.5.0
       Resolution: Fixed

Issue resolved by pull request 42587
[https://github.com/apache/spark/pull/42587]

> Local Property Propagation to Subquery Broadcast Exec
> -----------------------------------------------------
>
>                 Key: SPARK-44897
>                 URL: https://issues.apache.org/jira/browse/SPARK-44897
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Michael Chen
>            Assignee: Michael Chen
>            Priority: Major
>             Fix For: 3.5.0
>
>
> https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I 
> believe mistakenly reverted to address this issue. The claim was local 
> properties propagation in SubqueryBroadcastExec to the dynamic pruning thread 
> is not necessary because they will be propagated by broadcast threads 
> anyways. However, in a scenario where the dynamic pruning thread is first to 
> initialize the broadcast relation future, the local properties will not be 
> propagated correctly. This is because the local properties being propagated 
> to the broadcast threads would already be incorrect.
> I do not have a good way of reproducing this consistently because generally 
> the SubqueryBroadcastExec is not the first to initialize the broadcast 
> relation future, but by adding a Thread.sleep(1) into the doPrepare method of 
> SubqueryBroadcastExec, the following test always fails.
> {code:java}
> withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") 
> {
>   withTable("a", "b") {
>     val confKey = "spark.sql.y"
>     val confValue1 = UUID.randomUUID().toString()
>     val confValue2 = UUID.randomUUID().toString()
>     Seq((confValue1, "1")).toDF("key", "value")
>       .write
>       .format("parquet")
>       .partitionBy("key")
>       .mode("overwrite")
>       .saveAsTable("a")
>     val df1 = spark.table("a")
>     def generateBroadcastDataFrame(confKey: String, confValue: String): 
> Dataset[String] = {
>       val df = spark.range(1).mapPartitions { _ =>
>         Iterator(TaskContext.get.getLocalProperty(confKey))
>       }.filter($"value".contains(confValue)).as("c")
>       df.hint("broadcast")
>     }
>     // set local property and assert
>     val df2 = generateBroadcastDataFrame(confKey, confValue1)
>     spark.sparkContext.setLocalProperty(confKey, confValue1)
>     val checkDF = df1.join(df2).where($"a.key" === 
> $"c.value").select($"a.key", $"c.value")
>     val checks = checkDF.collect()
>     assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))
>     // change local property and re-assert
>     Seq((confValue2, "1")).toDF("key", "value")
>       .write
>       .format("parquet")
>       .partitionBy("key")
>       .mode("overwrite")
>       .saveAsTable("b")
>     val df3 = spark.table("b")
>     val df4 = generateBroadcastDataFrame(confKey, confValue2)
>     spark.sparkContext.setLocalProperty(confKey, confValue2)
>     val checks2DF = df3.join(df4).where($"b.key" === 
> $"c.value").select($"b.key", $"c.value")
>     val checks2 = checks2DF.collect()
>     assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
>     assert(checks2.nonEmpty)
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to