LuciferYang commented on code in PR #36496:
URL: https://github.com/apache/spark/pull/36496#discussion_r874402726


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala:
##########
@@ -563,4 +564,51 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe") {

Review Comment:
   @pan3793 I try to change the UT as follow:
   ```
   test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe") {
       val plan = spark.range(1).queryExecution.executedPlan
       val serializer = new TestCachedBatchSerializer(true, 1)
       val cachedRDDBuilder = CachedRDDBuilder(serializer, MEMORY_ONLY, plan, 
None)
   
       @volatile var isCachedColumnBuffersLoaded = false
       @volatile var stopped = false
   
       val th1 = new Thread {
         override def run(): Unit = {
           while (!isCachedColumnBuffersLoaded && !stopped) {
             cachedRDDBuilder.cachedColumnBuffers
             cachedRDDBuilder.clearCache()
           }
         }
       }
   
       val th2 = new Thread {
         override def run(): Unit = {
           while (!isCachedColumnBuffersLoaded && !stopped) {
             isCachedColumnBuffersLoaded = 
cachedRDDBuilder.isCachedColumnBuffersLoaded
           }
         }
       }
   
       val th3 = new Thread {
         override def run(): Unit = {
           Thread.sleep(3000L)
           stopped = true;
         }
       }
   
       val exceptionCnt = new AtomicInteger
       val exceptionHandler: Thread.UncaughtExceptionHandler = (_: Thread, 
cause: Throwable) => {
           exceptionCnt.incrementAndGet
           fail(cause)
         }
   
       th1.setUncaughtExceptionHandler(exceptionHandler)
       th2.setUncaughtExceptionHandler(exceptionHandler)
       th1.start()
       th2.start()
       th3.start()
       th1.join()
       th2.join()
       th3.join()
   
       cachedRDDBuilder.clearCache()
   
       assert(exceptionCnt.get == 0)
     }
   ```
   
   then 
   
   ```
   Exception: org.scalatest.exceptions.TestFailedException thrown from the 
UncaughtExceptionHandler in thread "Thread-16"
   
   
   1 did not equal 0
   ScalaTestFailureLocation: 
org.apache.spark.sql.execution.columnar.InMemoryColumnarQuerySuite at 
(InMemoryColumnarQuerySuite.scala:617)
   Expected :0
   Actual   :1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to