wzhfy commented on a change in pull request #29589:
URL: https://github.com/apache/spark/pull/29589#discussion_r484657207



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala
##########
@@ -60,10 +63,12 @@ case class SubqueryBroadcastExec(
   }
 
   @transient
-  private lazy val relationFuture: Future[Array[InternalRow]] = {
+  private lazy val relationFuture: JFuture[Array[InternalRow]] = {
     // relationFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    Future {
+    SQLExecution.withThreadLocalCaptured[Array[InternalRow]](

Review comment:
       @maropu I think you are right. Sorry I misunderstood your comment 
previously.
   Because in `SubqueryBroadcastExec.relationFuture`, what it really calls is 
`child.executeBroadcast[HashedRelation]().value`, it should use 
"broadcast-exchange" threads.
   
   As a result, it seems that propagation for thread local properties in 
`SubqueryBroadcastExec` is not necessary, they will be propagated by broadcast 
threads anyway.
   
   BTW, the current test case is also not reasonable, since the udf is not 
evaluated in the broadcasted part. I moved it to a filter (like below) and 
found the above problem. Thanks for pointing this out!
   ```
     test("SPARK-32748: propagate local properties to dynamic pruning thread") {
       val factTable = "fact_local_prop_dpp"
       val dimTable = "dim_local_prop_dpp"
       val filteringValue = 3
   
       def checkPropertyValueByUdfResult(
           propKey: String,
           propValue: String,
           expectedResultCount: Int): Unit = {
         spark.sparkContext.setLocalProperty(propKey, propValue)
         val df = sql(
           s"""
              |SELECT f.id
              |FROM $factTable f
              |INNER JOIN $dimTable s
              |ON f.id = s.id
              |AND compare_property_value(s.value, '$propKey', '$propValue') = 
$filteringValue
             """.stripMargin)
   
         val subqueryBroadcastSeq = df.queryExecution.executedPlan.flatMap {
           case s: FileSourceScanExec => s.partitionFilters.collect {
             case DynamicPruningExpression(InSubqueryExec(_, b: 
SubqueryBroadcastExec, _, _)) => b
           }
           case _ => Nil
         }
         assert(subqueryBroadcastSeq.nonEmpty,
           s"Should trigger DPP with a reused broadcast 
exchange:\n${df.queryExecution}")
   
         assert(df.collect().length == expectedResultCount)
       }
   
       withTable(factTable, dimTable) {
         spark.range(10).select($"id", $"id".as("value"))
           .write.partitionBy("id").mode("overwrite").saveAsTable(factTable)
         spark.range(5).select($"id", $"id".as("value"))
           .write.mode("overwrite").saveAsTable(dimTable)
   
         withSQLConf(
           StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD.key -> "1",
           SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
           SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
   
           try {
             spark.udf.register(
               "compare_property_value",
               (input: Int, propKey: String, propValue: String) => {
                 if (TaskContext.get().getLocalProperty(propKey) == propValue) {
                   filteringValue
                 } else {
                   input
                 }
               }
             )
             val propKey = "spark.sql.subquery.broadcast.prop.key"
   
             // set local property and assert
             val propValue1 = UUID.randomUUID().toString()
             checkPropertyValueByUdfResult(propKey, propValue1, 
expectedResultCount = 5)
   
             // change local property and re-assert
             val propValue2 = UUID.randomUUID().toString()
             checkPropertyValueByUdfResult(propKey, propValue2, 
expectedResultCount = 5)
           } finally {
             
spark.sessionState.catalog.dropTempFunction("compare_property_value", true)
           }
         }
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to