wzhfy commented on a change in pull request #29589:
URL: https://github.com/apache/spark/pull/29589#discussion_r484657207
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala
##########
@@ -60,10 +63,12 @@ case class SubqueryBroadcastExec(
}
@transient
- private lazy val relationFuture: Future[Array[InternalRow]] = {
+ private lazy val relationFuture: JFuture[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the
execution id correctly here.
val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- Future {
+ SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
Review comment:
@maropu I think you are right. Sorry I misunderstood your comment
previously.
Because in `SubqueryBroadcastExec.relationFuture`, what it really calls is
`child.executeBroadcast[HashedRelation]().value`, it should use
"broadcast-exchange" threads.
As a result, it seems that thread local properties in
`SubqueryBroadcastExec` is not necessary, they will be propagated by broadcast
threads anyway.
BTW, the current test case is also not reasonable, since the udf is not
evaluated in the broadcasted part. I moved it to a filter (like below) and
found the above problem. Thanks for pointing this out!
```
test("SPARK-32748: propagate local properties to dynamic pruning thread") {
val factTable = "fact_local_prop_dpp"
val dimTable = "dim_local_prop_dpp"
val filteringValue = 3
def checkPropertyValueByUdfResult(
propKey: String,
propValue: String,
expectedResultCount: Int): Unit = {
spark.sparkContext.setLocalProperty(propKey, propValue)
val df = sql(
s"""
|SELECT f.id
|FROM $factTable f
|INNER JOIN $dimTable s
|ON f.id = s.id
|AND compare_property_value(s.value, '$propKey', '$propValue') =
$filteringValue
""".stripMargin)
val subqueryBroadcastSeq = df.queryExecution.executedPlan.flatMap {
case s: FileSourceScanExec => s.partitionFilters.collect {
case DynamicPruningExpression(InSubqueryExec(_, b:
SubqueryBroadcastExec, _, _)) => b
}
case _ => Nil
}
assert(subqueryBroadcastSeq.nonEmpty,
s"Should trigger DPP with a reused broadcast
exchange:\n${df.queryExecution}")
assert(df.collect().length == expectedResultCount)
}
withTable(factTable, dimTable) {
spark.range(10).select($"id", $"id".as("value"))
.write.partitionBy("id").mode("overwrite").saveAsTable(factTable)
spark.range(5).select($"id", $"id".as("value"))
.write.mode("overwrite").saveAsTable(dimTable)
withSQLConf(
StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD.key -> "1",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
try {
spark.udf.register(
"compare_property_value",
(input: Int, propKey: String, propValue: String) => {
if (TaskContext.get().getLocalProperty(propKey) == propValue) {
filteringValue
} else {
input
}
}
)
val propKey = "spark.sql.subquery.broadcast.prop.key"
// set local property and assert
val propValue1 = UUID.randomUUID().toString()
checkPropertyValueByUdfResult(propKey, propValue1,
expectedResultCount = 5)
// change local property and re-assert
val propValue2 = UUID.randomUUID().toString()
checkPropertyValueByUdfResult(propKey, propValue2,
expectedResultCount = 5)
} finally {
spark.sessionState.catalog.dropTempFunction("compare_property_value", true)
}
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]