This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9440590a909 [SPARK-38948][TESTS] Fix `DiskRowQueue` leak in 
`PythonForeachWriterSuite`
9440590a909 is described below

commit 9440590a909d9222db838426c8e528ddec90e196
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Mon Apr 25 09:53:02 2022 +0900

    [SPARK-38948][TESTS] Fix `DiskRowQueue` leak in `PythonForeachWriterSuite`
    
    ### What changes were proposed in this pull request?
    This pr add `try-finally` for  `run` method  of `BufferTester.thread` and 
call `buffer.close()` in the `finally` block to  ensure the  resources held by 
`BufferTester.buffer` are released.
    
    Before this pr, there will be an `DiskRowQueue` resource hold by 
`BufferTester.buffer` not closed.
    
    ### Why are the changes needed?
    Minor fix of UT.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA.
    
    Closes #36261 from LuciferYang/SPARK-38948.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/execution/python/PythonForeachWriterSuite.scala      | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
index 61c9782bd17..02d6ff87f89 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
@@ -102,11 +102,15 @@ class PythonForeachWriterSuite extends SparkFunSuite with 
Eventually with Mockit
     private val intProj = UnsafeProjection.create(Array[DataType](IntegerType))
     private val thread = new Thread() {
       override def run(): Unit = {
-        while (iterator.hasNext) {
-          outputBuffer.synchronized {
-            outputBuffer += iterator.next().getInt(0)
+        try {
+          while (iterator.hasNext) {
+            outputBuffer.synchronized {
+              outputBuffer += iterator.next().getInt(0)
+            }
+            Thread.sleep(sleepPerRowReadMs)
           }
-          Thread.sleep(sleepPerRowReadMs)
+        } finally {
+          buffer.close()
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to