Github user BryanCutler commented on a diff in the pull request:
https://github.com/apache/spark/pull/19459#discussion_r150324146
--- Diff: python/pyspark/sql/session.py ---
@@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
# Check if any columns need to be fixed for Spark to infer properly
if len(np_records) > 0:
- record_type_list = self._get_numpy_record_dtypes(np_records[0])
- if record_type_list is not None:
- return [r.astype(record_type_list).tolist() for r in
np_records], schema
+ record_dtype = self._get_numpy_record_dtype(np_records[0])
+ if record_dtype is not None:
+ return [r.astype(record_dtype).tolist() for r in
np_records], schema
# Convert list of numpy records to python lists
return [r.tolist() for r in np_records], schema
+ def _create_from_pandas_with_arrow(self, pdf, schema):
+ """
+ Create a DataFrame from a given pandas.DataFrame by slicing it
into partitions, converting
+ to Arrow data, then sending to the JVM to parallelize. If a schema
is passed in, the
+ data types will be used to coerce the data in Pandas to Arrow
conversion.
+ """
+ from pyspark.serializers import ArrowSerializer, _create_batch
+ from pyspark.sql.types import from_arrow_schema, to_arrow_type,
TimestampType
+ from pandas.api.types import is_datetime64_dtype,
is_datetime64tz_dtype
+
+ # Determine arrow types to coerce data when creating batches
+ if isinstance(schema, StructType):
+ arrow_types = [to_arrow_type(f.dataType) for f in
schema.fields]
+ elif isinstance(schema, DataType):
+ raise ValueError("Single data type %s is not supported with
Arrow" % str(schema))
+ else:
+ # Any timestamps must be coerced to be compatible with Spark
+ arrow_types = [to_arrow_type(TimestampType())
+ if is_datetime64_dtype(t) or
is_datetime64tz_dtype(t) else None
+ for t in pdf.dtypes]
+
+ # Slice the DataFrame to be batched
+ step = -(-len(pdf) // self.sparkContext.defaultParallelism) #
round int up
+ pdf_slices = (pdf[start:start + step] for start in xrange(0,
len(pdf), step))
+
+ # Create Arrow record batches
+ batches = [_create_batch([(c, t) for (_, c), t in
zip(pdf_slice.iteritems(), arrow_types)])
+ for pdf_slice in pdf_slices]
+
+ # Create the Spark schema from the first Arrow batch (always at
least 1 batch after slicing)
+ if schema is None or isinstance(schema, list):
+ schema_from_arrow = from_arrow_schema(batches[0].schema)
+ names = pdf.columns if schema is None else schema
--- End diff --
hmm, actually that case can be pulled out of here and non-Arrow
`_create_from_pandas`, which @ueshin brought up in #19646 . This would
simplify quite a bit now, so I'll try that out.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]