Github user BryanCutler commented on a diff in the pull request:
https://github.com/apache/spark/pull/19646#discussion_r149210042
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema
+ def _get_numpy_record_dtypes(self, rec):
+ """
+ Used when converting a pandas.DataFrame to Spark using
to_records(), this will correct
+ the dtypes of records so they can be properly loaded into Spark.
+ :param rec: a numpy record to check dtypes
+ :return corrected dtypes for a numpy.record or None if no
correction needed
+ """
+ import numpy as np
+ cur_dtypes = rec.dtype
+ col_names = cur_dtypes.names
+ record_type_list = []
+ has_rec_fix = False
+ for i in xrange(len(cur_dtypes)):
+ curr_type = cur_dtypes[i]
+ # If type is a datetime64 timestamp, convert to microseconds
+ # NOTE: if dtype is datetime[ns] then np.record.tolist() will
output values as longs,
+ # conversion from [us] or lower will lead to py datetime
objects, see SPARK-22417
+ if curr_type == np.dtype('datetime64[ns]'):
+ curr_type = 'datetime64[us]'
+ has_rec_fix = True
+ record_type_list.append((str(col_names[i]), curr_type))
+ return record_type_list if has_rec_fix else None
+
+ def _convert_from_pandas(self, pdf, schema):
+ """
+ Convert a pandas.DataFrame to list of records that can be used to
make a DataFrame
+ :return tuple of list of records and schema
+ """
+ # If no schema supplied by user then get the names of columns only
+ if schema is None:
+ schema = [str(x) for x in pdf.columns]
+
+ # Convert pandas.DataFrame to list of numpy records
+ np_records = pdf.to_records(index=False)
+
+ # Check if any columns need to be fixed for Spark to infer properly
+ if len(np_records) > 0:
+ record_type_list = self._get_numpy_record_dtypes(np_records[0])
--- End diff --
The dtype for a numpy record is in a different format
```
n [16]: r
Out[16]: (0, datetime.date(2017, 11, 6), 1509411661000000000L)
In [17]: r.dtype
Out[17]: dtype((numpy.record, [(u'index', '<i8'), (u'd', 'O'), (u'ts',
'<M8[ns]')]))
```
so when using `numpy.record.astype()` it has to be specified in the same
format and include dtypes for all fields. If we try to do this with pandas
dtypes from the DataFrame, there might be some differences that could cause
errors, so I think it's safer to use the dtypes output from numpy and only
change the timestamp resolution.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]