Github user BryanCutler commented on a diff in the pull request:
https://github.com/apache/spark/pull/19459#discussion_r146421804
--- Diff: python/pyspark/sql/session.py ---
@@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema
+ def _createFromPandasWithArrow(self, pdf, schema):
+ """
+ Create a DataFrame from a given pandas.DataFrame by slicing it
into partitions, converting
+ to Arrow data, then sending to the JVM to parallelize. If a schema
is passed in, the
+ data types will be used to coerce the data in Pandas to Arrow
conversion.
+ """
+ from pyspark.serializers import ArrowSerializer
+ from pyspark.sql.types import from_arrow_schema, to_arrow_type,
_cast_pandas_series_type
+ import pyarrow as pa
+
+ # Slice the DataFrame into batches
+ step = -(-len(pdf) // self.sparkContext.defaultParallelism) #
round int up
+ pdf_slices = (pdf[start:start + step] for start in xrange(0,
len(pdf), step))
+
+ if schema is None or isinstance(schema, list):
+ batches = [pa.RecordBatch.from_pandas(pdf_slice,
preserve_index=False)
+ for pdf_slice in pdf_slices]
+
+ # There will be at least 1 batch after slicing the
pandas.DataFrame
+ schema_from_arrow = from_arrow_schema(batches[0].schema)
+
+ # If passed schema as a list of names then rename fields
+ if isinstance(schema, list):
+ fields = []
+ for i, field in enumerate(schema_from_arrow):
+ field.name = schema[i]
+ fields.append(field)
+ schema = StructType(fields)
+ else:
+ schema = schema_from_arrow
+ else:
+ batches = []
+ for i, pdf_slice in enumerate(pdf_slices):
+
+ # convert to series to pyarrow.Arrays to use mask when
creating Arrow batches
+ arrs = []
+ names = []
+ for c, (_, series) in enumerate(pdf_slice.iteritems()):
+ field = schema[c]
+ names.append(field.name)
+ t = to_arrow_type(field.dataType)
+ try:
+ # NOTE: casting is not necessary with Arrow >= 0.7
+
arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
--- End diff --
I think that is a problem with using `astype` which doesn't provide any
checks afaik. This casting is better done in Arrow, but since we are currently
stuck on 0.4.1 we need this workaround. Trying this out with the latest arrow
would give the user a nice error:
```
>>> pa.Array.from_pandas(s, type=pa.int16())
<pyarrow.lib.Int16Array object at 0x7f18361fecb0>
[
1,
2,
10001
]
>>> pa.Array.from_pandas(s, type=pa.int8())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/array.pxi", line 279, in pyarrow.lib.Array.from_pandas
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:25865)
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24833)
File "pyarrow/array.pxi", line 70, in pyarrow.lib._ndarray_to_array
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24083)
File "pyarrow/error.pxi", line 77, in pyarrow.lib.check_status
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:7876)
pyarrow.lib.ArrowInvalid: Integer value out of bounds
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]