nickstanishadb commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1449007899
##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
If non-empty, this is a sequence of expressions that the UDTF is
specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partitionBy' list
must also be non-empty
in this case.
+ acquireExecutionMemoryMbRequested: long
+ If this is not None, this represents the amount of memory in megabytes
that the UDTF should
+ request from each Spark executor that it runs on. Then the UDTF takes
responsibility to use
+ at most this much memory, including all allocated objects. The purpose
of this functionality
+ is to prevent executors from crashing by running out of memory due to
the extra memory
+ consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup'
methods. Spark will
+ then call 'TaskMemoryManager.acquireExecutionMemory' with the
requested number of megabytes.
+ acquireExecutionMemoryMbActual: long
+ If there is a task context available, Spark will assign this field to
the number of
+ megabytes returned from the call to the
TaskMemoryManager.acquireExecutionMemory' method, as
+ consumed by the UDTF's'__init__' method. Therefore, its 'eval' and
'terminate' and 'cleanup'
+ methods will know it thereafter and can ensure to bound memory usage
to at most this number.
+ Note that there is no effect if the UDTF's 'analyze' method assigns a
value to this; it will
+ be overwritten.
"""
Review Comment:
@dtenedor I did some dumb memory profiling running the following UDTF
```python
from pyspark.sql.functions import udtf
import resource
@udtf(returnType="step: int, memory: int")
class SimpleUDTF:
def __init__(self, *args, **kwargs):
self.step_id = 0
@staticmethod
def get_peak_memory_usage_kb() -> int:
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
def eval(self, *args, **kwargs):
yield self.step_id, self.get_peak_memory_usage_kb()
self.step_id += 1
def terminate(self, *args, **kwargs):
yield self.step_id, self.get_peak_memory_usage_kb()
spark.udtf.register("pyUdtfMemProfile", SimpleUDTF)
```
I'm not entirely confident in the results because I noticed running a high
memory UDTF before the profiling UDTF would increase the peak memory usage (I
guess they share a process?). But on a fresh 14.2 cluster I ran this and got
max memory usage across 100 pyUdtfMemProfile invocations of `45440 KB`, which
seems reasonable.
Is this memory already accounted for / pre-allocated on the executor? It's
not clear to me if a user should request `processSize + additionalClassMemory`
or just `additionalClassMemory` when they're specifying
`acquireExecutionMemoryMbRequested` and `minMemoryMb`. If it's the total then
it seems like `minMemoryMb` should always be `>=1`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]