zhengruifeng commented on code in PR #53867:
URL: https://github.com/apache/spark/pull/53867#discussion_r2707453121


##########
python/pyspark/sql/tests/arrow/test_arrow_map.py:
##########
@@ -55,6 +55,16 @@ def func(iterator):
         expected = df.collect()
         self.assertEqual(actual, expected)
 
+    def test_map_in_arrow_with_limit(self):

Review Comment:
   before this fix, this test fails with
   ```
   ERROR [0.449s]: test_map_in_arrow_with_limit 
(__main__.MapInArrowWithOutputArrowBatchSlicingBytesTests.test_map_in_arrow_with_limit)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/Users/ruifeng.zheng/spark/python/pyspark/sql/tests/arrow/test_arrow_map.py", 
line 66, in test_map_in_arrow_with_limit
       df.mapInArrow(get_size, "size long").limit(1).collect()
       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
     File "/Users/ruifeng.zheng/spark/python/pyspark/sql/classic/dataframe.py", 
line 474, in collect
       sock_info = self._jdf.collectToPython()
     File 
"/Users/ruifeng.zheng/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py",
 line 1362, in __call__
       return_value = get_return_value(
           answer, self.gateway_client, self.target_id, self.name)
     File 
"/Users/ruifeng.zheng/spark/python/pyspark/errors/exceptions/captured.py", line 
263, in deco
       return f(*a, **kw)
     File 
"/Users/ruifeng.zheng/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/protocol.py", 
line 327, in get_return_value
       raise Py4JJavaError(
           "An error occurred while calling {0}{1}{2}.\n".
           format(target_id, ".", name), value)
   py4j.protocol.Py4JJavaError: An error occurred while calling 
o1111.collectToPython.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 
(TID 47) (localhost executor driver): 
org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by 
query. Memory leaked: (12)
   Allocator(stdin reader for 
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python3) 
0/12/12/9223372036854775807 (res/actual/peak/limit)
   
           at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:267)
           at 
org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:157)
           at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:146)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:199)
           at org.apache.spark.scheduler.Task.run(Task.scala:147)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
           at java.base/java.lang.Thread.run(Thread.java:1583)
           Suppressed: java.lang.IllegalStateException: Memory was leaked by 
query. Memory leaked: (12)
   Allocator(stdin reader for 
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python3) 
0/12/12/9223372036854775807 (res/actual/peak/limit)
   
                   at 
org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
                   at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1(PythonArrowOutput.scala:84)
                   at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1$adapted(PythonArrowOutput.scala:77)
                   at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:146)
                   at 
org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:157)
                   at 
org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:157)
                   at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:212)
                   ... 12 more
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to