HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265810573

 File path: python/pyspark/serializers.py
 @@ -352,28 +364,17 @@ def dump_stream(self, iterator, stream):
         Make ArrowRecordBatches from Pandas Series and serialize. Input is a 
single series or
         a list of series accompanied by an optional pyarrow type to coerce the 
data to.
-        import pyarrow as pa
-        writer = None
-        try:
-            for series in iterator:
-                batch = _create_batch(series, self._timezone, self._safecheck,
-                                      self._assign_cols_by_name)
-                if writer is None:
-                    write_int(SpecialLengths.START_ARROW_STREAM, stream)
-                    writer = pa.RecordBatchStreamWriter(stream, batch.schema)
-                writer.write_batch(batch)
-        finally:
-            if writer is not None:
-                writer.close()
+        batches = (_create_batch(series, self._timezone, self._safecheck, 
+                   for series in iterator)
 Review comment:
   Hm ... @BryanCutler, seems `_init_dump_stream` was added to handle 
`write_int(SpecialLengths.START_ARROW_STREAM, stream)` this case alone TBH. 
Could we just only pull out `write_int(SpecialLengths.START_ARROW_STREAM, 
stream)` into here and remove `_init_dump_stream` to make this logic isolated 
   It looks tricky to do it since it's all generators. I thought we could at 
least do something like:
           batches = (_create_batch(series, self._timezone, self._safecheck, 
                      for series in iterator)
           def arrow_start_written_batches():
               should_write_start_length = True
               for batch in batches:
                   if should_write_start_length:
                       write_int(SpecialLengths.START_ARROW_STREAM, stream)
                       should_write_start_length = False
                   yield batch
self).dump_stream(arrow_start_written_batches(), stream)

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to