HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265810573
 
 

 ##########
 File path: python/pyspark/serializers.py
 ##########
 @@ -352,28 +364,17 @@ def dump_stream(self, iterator, stream):
         Make ArrowRecordBatches from Pandas Series and serialize. Input is a 
single series or
         a list of series accompanied by an optional pyarrow type to coerce the 
data to.
         """
-        import pyarrow as pa
-        writer = None
-        try:
-            for series in iterator:
-                batch = _create_batch(series, self._timezone, self._safecheck,
-                                      self._assign_cols_by_name)
-                if writer is None:
-                    write_int(SpecialLengths.START_ARROW_STREAM, stream)
-                    writer = pa.RecordBatchStreamWriter(stream, batch.schema)
-                writer.write_batch(batch)
-        finally:
-            if writer is not None:
-                writer.close()
+        batches = (_create_batch(series, self._timezone, self._safecheck, 
self._assign_cols_by_name)
+                   for series in iterator)
 
 Review comment:
   Hm ... @BryanCutler, seems `_init_dump_stream` was added to handle 
`write_int(SpecialLengths.START_ARROW_STREAM, stream)` this case alone TBH. 
Could we just only pull out `write_int(SpecialLengths.START_ARROW_STREAM, 
stream)` into here and remove `_init_dump_stream` to make this logic isolated 
here?
   
   It looks tricky to do it since it's all generators. I thought we could at 
least do something like:
   
   ```python
           batches = (_create_batch(series, self._timezone, self._safecheck, 
self._assign_cols_by_name)
                      for series in iterator)
           def arrow_start_written_batches():
               should_write_start_length = True
               for batch in batches:
                   if should_write_start_length:
                       write_int(SpecialLengths.START_ARROW_STREAM, stream)
                       should_write_start_length = False
                   yield batch
           super(ArrowStreamPandasSerializer, 
self).dump_stream(arrow_start_written_batches(), stream)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to