karuppayya commented on code in PR #52213:
URL: https://github.com/apache/spark/pull/52213#discussion_r2540670352
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -69,6 +69,18 @@ class AdaptiveQueryExecSuite
setupTestData()
+ protected override def beforeAll(): Unit = {
Review Comment:
The emanates from the the folwoing method
`org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads`
```
private def checkNumLocalShuffleReads(
plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
val numShuffles = collect(plan) {
case s: ShuffleQueryStageExec => s
}.length
val numLocalReads = collect(plan) {
case read: AQEShuffleReadExec if read.isLocalRead => read
}
numLocalReads.foreach { r =>
val rdd = r.execute()
val parts = rdd.partitions
assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
}
assert(numShuffles === (numLocalReads.length +
numShufflesWithoutLocalRead))
}
```
Specifically, `rdd.preferredLocations(_).nonEmpty)` will be empty after the
cleanup (after`collect()` executes).
When shuffle clean up is enabled, this will always be empty.
As for concrete example, almost all test in this suite use this method and
fail from that assertion.
( its actually a race between when the shuffle cleanup happens and when this
assertion executes)
```
scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1:
org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
ScalaTestFailureLocation:
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite at
(AdaptiveQueryExecSuite.scala:221)
org.scalatest.exceptions.TestFailedException:
scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1:
org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
at
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
at
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$checkNumLocalShuffleReads$1(AdaptiveQueryExecSuite.scala:221)
at scala.collection.immutable.List.foreach(List.scala:323)
at
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkNumLocalShuffleReads(AdaptiveQueryExecSuite.scala:218)
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -69,6 +69,18 @@ class AdaptiveQueryExecSuite
setupTestData()
+ protected override def beforeAll(): Unit = {
Review Comment:
> we can override sparkConf
I tried setting it on `SparkConf`, but doesn't seem to take effect.
I guess [SparkConf is
read](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L138)
when creating the SparkSession(and SQLConf), and setting on spark conf later
is ineffective for sql execution(since it looks into SQLConf). Let me know if i
am missing something
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]