Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19459#discussion_r150445421
--- Diff: python/pyspark/sql/session.py ---
@@ -438,28 +438,70 @@ def _get_numpy_record_dtypes(self, rec):
curr_type = 'datetime64[us]'
has_rec_fix = True
record_type_list.append((str(col_names[i]), curr_type))
- return record_type_list if has_rec_fix else None
+ return np.dtype(record_type_list) if has_rec_fix else None
- def _convert_from_pandas(self, pdf, schema):
+ def _convert_from_pandas(self, pdf):
"""
Convert a pandas.DataFrame to list of records that can be used to
make a DataFrame
- :return tuple of list of records and schema
+ :return list of records
"""
- # If no schema supplied by user then get the names of columns only
- if schema is None:
- schema = [str(x) for x in pdf.columns]
# Convert pandas.DataFrame to list of numpy records
np_records = pdf.to_records(index=False)
# Check if any columns need to be fixed for Spark to infer properly
if len(np_records) > 0:
- record_type_list = self._get_numpy_record_dtypes(np_records[0])
- if record_type_list is not None:
- return [r.astype(record_type_list).tolist() for r in
np_records], schema
+ record_dtype = self._get_numpy_record_dtype(np_records[0])
+ if record_dtype is not None:
+ return [r.astype(record_dtype).tolist() for r in
np_records]
# Convert list of numpy records to python lists
- return [r.tolist() for r in np_records], schema
+ return [r.tolist() for r in np_records]
+
+ def _create_from_pandas_with_arrow(self, pdf, schema):
+ """
+ Create a DataFrame from a given pandas.DataFrame by slicing it
into partitions, converting
+ to Arrow data, then sending to the JVM to parallelize. If a schema
is passed in, the
+ data types will be used to coerce the data in Pandas to Arrow
conversion.
+ """
+ from pyspark.serializers import ArrowSerializer, _create_batch
+ from pyspark.sql.types import from_arrow_schema, to_arrow_type,
TimestampType
+ from pandas.api.types import is_datetime64_dtype,
is_datetime64tz_dtype
+
+ # Determine arrow types to coerce data when creating batches
+ if isinstance(schema, StructType):
+ arrow_types = [to_arrow_type(f.dataType) for f in
schema.fields]
+ elif isinstance(schema, DataType):
+ raise ValueError("Single data type %s is not supported with
Arrow" % str(schema))
+ else:
+ # Any timestamps must be coerced to be compatible with Spark
+ arrow_types = [to_arrow_type(TimestampType())
+ if is_datetime64_dtype(t) or
is_datetime64tz_dtype(t) else None
+ for t in pdf.dtypes]
+
+ # Slice the DataFrame to be batched
+ step = -(-len(pdf) // self.sparkContext.defaultParallelism) #
round int up
+ pdf_slices = (pdf[start:start + step] for start in xrange(0,
len(pdf), step))
+
+ # Create Arrow record batches
+ batches = [_create_batch([(c, t) for (_, c), t in
zip(pdf_slice.iteritems(), arrow_types)])
+ for pdf_slice in pdf_slices]
+
+ # Create the Spark schema from the first Arrow batch (always at
least 1 batch after slicing)
+ if isinstance(schema, (list, tuple)):
+ struct = from_arrow_schema(batches[0].schema)
--- End diff --
@BryanCutler, I think here we'd meet the same issue, SPARK-15244 in this
code path. Mind opening a followup with a simple test if it is true?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]