Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139585713 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + """ + Serializes Pandas.Series as Arrow data. + """ + + def __init__(self): + super(ArrowPandasSerializer, self).__init__() + + def dumps(self, series): + """ + Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or + a list of series accompanied by an optional pyarrow type to coerce the data to. + """ + import pyarrow as pa + # Make input conform to [(series1, type1), (series2, type2), ...] + if not isinstance(series, (list, tuple)) or \ + (len(series) == 2 and isinstance(series[1], pa.DataType)): + series = [series] + series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] + arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] + batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) + return super(ArrowPandasSerializer, self).dumps(batch) + + def loads(self, obj): + """ + Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series + followed by a dictionary containing length of the loaded batches. + """ + import pyarrow as pa + reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) + batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] + # NOTE: a 0-parameter pandas_udf will produce an empty batch that can have num_rows set + num_rows = sum([batch.num_rows for batch in batches]) --- End diff -- I'd use generator comprehension here too.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org