dtenedor opened a new pull request, #44678:
URL: https://github.com/apache/spark/pull/44678

   ### What changes were proposed in this pull request?
   
   This PR creates a Python UDTF API to acquire execution memory for 'eval' and 
'terminate' methods.
   
   For example, this UDTF accepts an argument representing the requested amount 
of memory to allocate (of which it will receive some subset), and a second 
argument containing the minimum memory allowed. It sets a status column to 
indicate an "out of memory" condition if the latter exceeds the memory 
allocated by Spark:
   
   ```
   from dataclasses import dataclass
   from pyspark.sql.functions import AnalyzeResult
   from pyspark.sql.types import IntegerType, LongType, StringType, StructType
   
   @dataclass
   class CustomAnalyzeResult(AnalyzeResult):
       minMemoryMb: int = 0
   
   class $name:
       def __init__(self, analyze_result):
           self._analyze_result = analyze_result
   
       @staticmethod
       def analyze(**kwargs):
           argument = kwargs.get("argument")
           min_memory_mb = kwargs.get("min_memory_mb").value
           if argument is not None:
               assert(argument.dataType == IntegerType() or argument.dataType 
== LongType())
               argument_value = argument.value
           else:
               argument_value = None
           return CustomAnalyzeResult(
               schema=StructType()
                   .add("initial_request", LongType())
                   .add("acquired_memory", LongType())
                   .add("min_memory", LongType())
                   .add("status", StringType()),
               acquireExecutionMemoryMbRequested=argument_value,
               minMemoryMb=min_memory_mb)
   
       def eval(self, **kwargs):
           pass
   
       def terminate(self):
             yield (
                 self._analyze_result.acquireExecutionMemoryMbRequested,
                 self._analyze_result.acquireExecutionMemoryMbActual,
                 self._analyze_result.minMemoryMb,
                 "OK" if self._analyze_result.acquireExecutionMemoryMbActual >=
                     self._analyze_result.minMemoryMb else "Insufficient 
memory")
   ```
   
   Invoking it yields the following:
   
   ```
   SELECT * FROM UDTFAcquireExecutionMemory(argument => 4, min_memory_mb => 0)
   > 4  4       0       OK
   
   SELECT * FROM UDTFAcquireExecutionMemory(argument => 4, min_memory_mb => 10)
   > 4  4       10      Insufficient memory
   ```
   
   ### Why are the changes needed?
   
   Python UDTFs that import large libraries or otherwise use up a lot of memory 
from storing many input rows in memory need to register this memory usage with 
Spark executors in order to protect against OOM crashes.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, see above.
   
   ### How was this patch tested?
   
   This PR adds test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to