ueshin commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1454267605


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is 
specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list 
must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in MB that 
the UDTF should request
+        from each Spark executor that it runs on. Then the UDTF takes 
responsibility to use at most
+        this much memory, including all allocated objects. The purpose of this 
functionality is to
+        prevent executors from crashing by running out of memory due to the 
extra memory consumption
+        invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. 
Spark will then call
+        'TaskMemoryManager.acquireExecutionMemory' with the requested number 
of MB.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to 
the number of MB
+        returned from the call to the 
TaskMemoryManager.acquireExecutionMemory' method, as consumed
+        by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' 
and 'cleanup' methods
+        will know it thereafter and can ensure to bound memory usage to at 
most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a 
value to this; it will
+        be overwritten.
     """
 
     schema: StructType
     withSinglePartition: bool = False
     partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
     orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+    acquireExecutionMemoryMbRequested: Optional[int] = 100

Review Comment:
   Why is the default value `100`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(argMetas, projectedRowIter, schema, 
context)
+      memoryConsumer.foreach { consumer =>
+        val acquireMemoryMbActual: Long = consumer.acquireMemory()
+        udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+      }
+      val outputRowIterator = try {
+        evaluate(argMetas, projectedRowIter, schema, context)
+      } finally {
+        if (TaskContext.get() != null) {
+          memoryConsumer.foreach { consumer =>
+            consumer.freeMemory()
+          }
+        }
+      }

Review Comment:
   This acquires memory while only creating the iterator. The actually 
execution won't be affected.



##########
python/pyspark/sql/worker/analyze_udtf.py:
##########
@@ -225,6 +226,19 @@ def format_error(msg: str) -> str:
                 write_int(1, outfile)
             else:
                 write_int(2, outfile)
+        # Return the requested amount of execution memory to acquire, if any.
+        write_long(
+            0
+            if result.acquireExecutionMemoryMbRequested is None
+            else result.acquireExecutionMemoryMbRequested,
+            outfile,
+        )
+        write_long(
+            0
+            if result.acquireExecutionMemoryMbActual is None
+            else result.acquireExecutionMemoryMbActual,
+            outfile,
+        )

Review Comment:
   Do we need to send this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {

Review Comment:
   Why do we need this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(argMetas, projectedRowIter, schema, 
context)
+      memoryConsumer.foreach { consumer =>
+        val acquireMemoryMbActual: Long = consumer.acquireMemory()
+        udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+      }
+      val outputRowIterator = try {
+        evaluate(argMetas, projectedRowIter, schema, context)
+      } finally {
+        if (TaskContext.get() != null) {

Review Comment:
   Why do we need this check?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+    if (TaskContext.get() != null) {
+      Some(PythonUDTFMemoryConsumer(udtf))
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF 
evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with 
a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of 
memory in MB that
+ * the UDTF should request from each Spark executor that it runs on. Then the 
UDTF takes
+ * responsibility to use at most this much memory, including all allocated 
objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of 
memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and 
'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with 
the requested number
+ * of MB, and when Spark calls __init__  of the UDTF later, it updates the 
acquiredExecutionMemory
+ * integer passed into the UDTF constructor to the actual number returned from
+ * 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate' 
and 'cleanup' methods
+ * know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+  extends MemoryConsumer(TaskContext.get().taskMemoryManager(), 
MemoryMode.ON_HEAP) {

Review Comment:
   I'm not sure about this and how this helps to avoid OOM.
   IIUC, `MemoryConsumer` is supposed to manage the JVM memory, and configured 
as `ON_HEAP`. The on-heap memory should be reserved by JVM and won't be reduced 
once it's reserved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to