Yicong-Huang commented on code in PR #54125:
URL: https://github.com/apache/spark/pull/54125#discussion_r2800835891
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1582,13 +1279,25 @@ def construct_record_batch(pdfs, pdf_data_cnt,
pdf_schema, state_pdfs, state_dat
merged_pdf = pd.concat(pdfs, ignore_index=True)
merged_state_pdf = pd.concat(state_pdfs, ignore_index=True)
- return self._create_batch(
+ # Create batch from list of (DataFrame, spark_type) tuples
+ # Each DataFrame is wrapped as a StructArray
+ data = [count_pdf, merged_pdf, merged_state_pdf]
+ schema = StructType(
Review Comment:
The schema is constructed locally from three fixed struct types:
`result_count_df_type`, `pdf_schema` (the return schema from user function),
and `result_state_df_type`. These are defined in `__init__`. Added a comment to
clarify.
##########
python/pyspark/sql/tests/test_conversion.py:
##########
@@ -144,6 +149,199 @@ def test_wrap_struct_empty_batch(self):
self.assertEqual(wrapped.num_columns, 1)
[email protected](not have_pyarrow, pyarrow_requirement_message)
Review Comment:
Yes, added `@unittest.skipIf(not have_pandas, pandas_requirement_message)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]