alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974672806


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2705,6 +2705,44 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+    
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+      .internal()
+      .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+        "records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+        "restrict the amount of memory being used to materialize the data in 
both executor and " +
+        "Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+        "records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+        "is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+        "configured value.")
+      .version("3.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =

Review Comment:
   I wonder if we really care to have this param. Ultimately if the sizing 
estimate works badly, the users can just set a lower value for the batch size 
limit. I do not think it is useful to let them tune this parameter.



##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -620,6 +622,35 @@ class RelationalGroupedDataset protected[sql](
     Dataset.ofRows(df.sparkSession, plan)
   }
 
+  private[sql] def applyInPandasWithState(

Review Comment:
   method level comment



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2705,6 +2705,44 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+    
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+      .internal()
+      .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+        "records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+        "restrict the amount of memory being used to materialize the data in 
both executor and " +
+        "Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+        "records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+        "is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+        "configured value.")
+      .version("3.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64MB")

Review Comment:
   I agree that expressing the limit in terms of bytes is more meaningful that 
records. However we estimate the bytes size efficiently. Specifically here I 
would rename 'softLimitSizePerBatch' by removing 'soft' - we can clarify in the 
comment about that and also including 'Bytes' - 'batchSizeLimitBytes' . Also 
wonder if we should have the property specific to applyInPandasWithState or 
should we make it general - remove the applyInPandasWithState scoping even if 
we do not support this limit initially, seems like generally meaningful and we 
can follow up fixing the other places as a bug.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2705,6 +2705,44 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+    
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+      .internal()
+      .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+        "records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+        "restrict the amount of memory being used to materialize the data in 
both executor and " +
+        "Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+        "records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+        "is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+        "configured value.")
+      .version("3.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =
+    
buildConf("spark.sql.execution.applyInPandasWithState.minDataCountForSample")
+      .internal()
+      .doc("When using applyInPandasWithState, specify the minimum number of 
records to sample " +
+        "the size of record. The size being retrieved from sampling will be 
used to estimate " +
+        "the accumulated size of records. Note that limiting by size does not 
work if the " +
+        "number of records are less than the configured value. For such case, 
ArrowRecordBatch " +
+        "will only be split for soft timeout.")
+      .version("3.4.0")
+      .intConf
+      .createWithDefault(100)
+
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH =

Review Comment:
   again, should we really expose this? Lets have a reasonable const value to 
start with and not expose a config. It is impossible to understand what this 
means unless you intimately know the implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to