dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1456374420
##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
If non-empty, this is a sequence of expressions that the UDTF is
specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partitionBy' list
must also be non-empty
in this case.
+ acquireExecutionMemoryMbRequested: long
+ If this is not None, this represents the amount of memory in MB that
the UDTF should request
+ from each Spark executor that it runs on. Then the UDTF takes
responsibility to use at most
+ this much memory, including all allocated objects. The purpose of this
functionality is to
+ prevent executors from crashing by running out of memory due to the
extra memory consumption
+ invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods.
Spark will then call
+ 'TaskMemoryManager.acquireExecutionMemory' with the requested number
of MB.
+ acquireExecutionMemoryMbActual: long
+ If there is a task context available, Spark will assign this field to
the number of MB
+ returned from the call to the
TaskMemoryManager.acquireExecutionMemory' method, as consumed
+ by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate'
and 'cleanup' methods
+ will know it thereafter and can ensure to bound memory usage to at
most this number.
+ Note that there is no effect if the UDTF's 'analyze' method assigns a
value to this; it will
+ be overwritten.
"""
schema: StructType
withSinglePartition: bool = False
partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+ acquireExecutionMemoryMbRequested: Optional[int] = 100
Review Comment:
I put a short comment that we set a default of 100 MB here, which the UDTF
may override to a more accurate number.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
projection(inputRow)
}
- val outputRowIterator = evaluate(argMetas, projectedRowIter, schema,
context)
+ memoryConsumer.foreach { consumer =>
+ val acquireMemoryMbActual: Long = consumer.acquireMemory()
+ udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+ }
+ val outputRowIterator = try {
+ evaluate(argMetas, projectedRowIter, schema, context)
+ } finally {
+ if (TaskContext.get() != null) {
+ memoryConsumer.foreach { consumer =>
+ consumer.freeMemory()
+ }
+ }
+ }
Review Comment:
Good catch, I didn't see that `evaluate` just returned an iterator instead
of actually invoking it :) I moved the memory free step later to after the
function actually evaluates.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
}
}
}
+
+ lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
Review Comment:
This object serves the purpose of acquiring execution memory before
evaluating the UDTF. After the evaluation is complete, we call its 'freeMemory'
method to release the memory. I added a comment here.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
projection(inputRow)
}
- val outputRowIterator = evaluate(argMetas, projectedRowIter, schema,
context)
+ memoryConsumer.foreach { consumer =>
+ val acquireMemoryMbActual: Long = consumer.acquireMemory()
+ udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+ }
+ val outputRowIterator = try {
+ evaluate(argMetas, projectedRowIter, schema, context)
+ } finally {
+ if (TaskContext.get() != null) {
Review Comment:
This was left-over from before I moved the check into the `memoryConsumer`
object itself:
```
lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
if (TaskContext.get() != null) {
Some(PythonUDTFMemoryConsumer(udtf))
} else {
None
}
}
```
I removed this check here now.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
}
}
}
+
+ lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+ if (TaskContext.get() != null) {
+ Some(PythonUDTFMemoryConsumer(udtf))
+ } else {
+ None
+ }
+ }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF
evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with
a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of
memory in MB that
+ * the UDTF should request from each Spark executor that it runs on. Then the
UDTF takes
+ * responsibility to use at most this much memory, including all allocated
objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of
memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and
'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with
the requested number
+ * of MB, and when Spark calls __init__ of the UDTF later, it updates the
acquiredExecutionMemory
+ * integer passed into the UDTF constructor to the actual number returned from
+ * 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate'
and 'cleanup' methods
+ * know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+ extends MemoryConsumer(TaskContext.get().taskMemoryManager(),
MemoryMode.ON_HEAP) {
Review Comment:
My mistake, I should have written `MemoryMode.OFF_HEAP` here to indicate
that the memory consumed by the Python subprocess is outside the JVM. Fixed
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]