HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r973863915
########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2705,6 +2705,44 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH = + buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch") + .internal() + .doc("When using applyInPandasWithState, set a soft limit of the accumulated size of " + + "records that can be written to a single ArrowRecordBatch in memory. This is used to " + + "restrict the amount of memory being used to materialize the data in both executor and " + + "Python worker. The accumulated size of records are calculated via sampling a set of " + + "records. Splitting the ArrowRecordBatch is performed per record, so unless a record " + + "is quite huge, the size of constructed ArrowRecordBatch will be around the " + + "configured value.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") + + val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE = + buildConf("spark.sql.execution.applyInPandasWithState.minDataCountForSample") + .internal() + .doc("When using applyInPandasWithState, specify the minimum number of records to sample " + + "the size of record. The size being retrieved from sampling will be used to estimate " + + "the accumulated size of records. Note that limiting by size does not work if the " + + "number of records are less than the configured value. For such case, ArrowRecordBatch " + + "will only be split for soft timeout.") + .version("3.4.0") + .intConf + .createWithDefault(100) + + val MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH = + buildConf("spark.sql.execution.applyInPandasWithState.softTimeoutPurgeBatch") + .internal() + .doc("When using applyInPandasWithState, specify the soft timeout for purging the " + + "ArrowRecordBatch. If batching records exceeds the timeout, Spark will force splitting " + + "the ArrowRecordBatch regardless of estimated size. This config ensures the receiver " + + "of data (both executor and Python worker) to not wait indefinitely for sender to " + + "complete the ArrowRecordBatch, which may hurt both throughput and latency.") + .version("3.4.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("100ms") Review Comment: I'm not 100% clear how `spark.sql.execution.pandas.udf.buffer.size` works. Current logic won't work if this config is able to split an arrow record batch further down to multiple, as we rely on offset and the number of rows to split the range of data from overall arrow record batch. It relies on the fact that the logic has full control of constructing arrow record batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org