HeartSaVioR commented on code in PR #52391: URL: https://github.com/apache/spark/pull/52391#discussion_r2374536497
########## python/pyspark/sql/pandas/serializers.py: ########## @@ -1630,8 +1633,23 @@ def row_stream(): yield (batch_key, row) for batch_key, group_rows in groupby(row_stream(), key=lambda x: x[0]): - df = pd.DataFrame([row for _, row in group_rows]) - yield (batch_key, df) + rows = [] Review Comment: @HyukjinKwon @zhengruifeng What do you think about the code? We limit the size of Arrow RecordBatch in task thread when sending to Python worker, and @zeruibao added this to re-align the size for Pandas DataFrame. Did we do this in other UDF? Is it beneficial or probably over-thinking? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala: ########## @@ -106,12 +106,14 @@ class ApplyInPandasWithStatePythonRunner( } private val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch + private val arrowMaxBytesPerBatch = sqlConf.arrowMaxBytesPerBatch // applyInPandasWithState has its own mechanism to construct the Arrow RecordBatch instance. // Configurations are both applied to executor and Python worker, set them to the worker conf // to let Python worker read the config properly. override protected val workerConf: Map[String, String] = initialWorkerConf + - (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> arrowMaxRecordsPerBatch.toString) + (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> arrowMaxRecordsPerBatch.toString) + + (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> arrowMaxBytesPerBatch.toString) Review Comment: nit: the key name is same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org