HeartSaVioR commented on code in PR #52391:
URL: https://github.com/apache/spark/pull/52391#discussion_r2374536497


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1630,8 +1633,23 @@ def row_stream():
                         yield (batch_key, row)
 
             for batch_key, group_rows in groupby(row_stream(), key=lambda x: 
x[0]):
-                df = pd.DataFrame([row for _, row in group_rows])
-                yield (batch_key, df)
+                rows = []

Review Comment:
   @HyukjinKwon @zhengruifeng 
   What do you think about the code? We limit the size of Arrow RecordBatch in 
task thread when sending to Python worker, and @zeruibao added this to re-align 
the size for Pandas DataFrame. Did we do this in other UDF? Is it beneficial or 
probably over-thinking?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala:
##########
@@ -106,12 +106,14 @@ class ApplyInPandasWithStatePythonRunner(
   }
 
   private val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch
+  private val arrowMaxBytesPerBatch = sqlConf.arrowMaxBytesPerBatch
 
   // applyInPandasWithState has its own mechanism to construct the Arrow 
RecordBatch instance.
   // Configurations are both applied to executor and Python worker, set them 
to the worker conf
   // to let Python worker read the config properly.
   override protected val workerConf: Map[String, String] = initialWorkerConf +
-    (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxRecordsPerBatch.toString)
+    (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxRecordsPerBatch.toString) +
+    (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxBytesPerBatch.toString)

Review Comment:
   nit: the key name is same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to