karuppayya commented on code in PR #52213:
URL: https://github.com/apache/spark/pull/52213#discussion_r2540670352


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -69,6 +69,18 @@ class AdaptiveQueryExecSuite
 
   setupTestData()
 
+  protected override def beforeAll(): Unit = {

Review Comment:
   The emanates from the the folwoing method 
`org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads`
   ```
     private def checkNumLocalShuffleReads(
         plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
       val numShuffles = collect(plan) {
         case s: ShuffleQueryStageExec => s
       }.length
   
       val numLocalReads = collect(plan) {
         case read: AQEShuffleReadExec if read.isLocalRead => read
       }
       numLocalReads.foreach { r =>
         val rdd = r.execute()
         val parts = rdd.partitions
         assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
       }
       assert(numShuffles === (numLocalReads.length + 
numShufflesWithoutLocalRead))
     }
     ```
   Specifically, `rdd.preferredLocations(_).nonEmpty)` will be empty after the 
cleanup (after`collect()` executes).
   When shuffle clean up is enabled, this will always be empty.
   
   As for concrete example, almost all test in this suite use this method and 
fail  from that assertion.
   ( its actually a race between when the shuffle cleanup happens and when this 
assertion executes)
   ```
   scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: 
org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
   ScalaTestFailureLocation: 
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite at 
(AdaptiveQueryExecSuite.scala:221)
   org.scalatest.exceptions.TestFailedException: 
scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: 
org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
        at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
        at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
        at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
        at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$checkNumLocalShuffleReads$1(AdaptiveQueryExecSuite.scala:221)
        at scala.collection.immutable.List.foreach(List.scala:323)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkNumLocalShuffleReads(AdaptiveQueryExecSuite.scala:218)
   ```
   



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -69,6 +69,18 @@ class AdaptiveQueryExecSuite
 
   setupTestData()
 
+  protected override def beforeAll(): Unit = {

Review Comment:
   > we can override sparkConf
   
   I tried setting it on `SparkConf`,  but doesn't seem to take effect. 
   
     
   I guess [SparkConf is 
read](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L138)
 when creating the SparkSession(and SQLConf), and setting on spark conf later 
is ineffective for sql execution(since it looks into SQLConf). Let me know if i 
am missing something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to