Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21427#discussion_r191037717
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
"Number of columns of the returned pandas.DataFrame "
"doesn't match specified schema. "
"Expected: {} Actual: {}".format(len(return_type),
len(result.columns)))
- arrow_return_types = (to_arrow_type(field.dataType) for field in
return_type)
- return [(result[result.columns[i]], arrow_type)
- for i, arrow_type in enumerate(arrow_return_types)]
+ try:
+ # Assign result columns by schema name
+ return [(result[field.name], to_arrow_type(field.dataType))
for field in return_type]
+ except KeyError:
+ if all(not isinstance(name, basestring) for name in
result.columns):
+ # Assign result columns by position if they are not named
with strings
+ return [(result[result.columns[i]],
to_arrow_type(field.dataType))
+ for i, field in enumerate(return_type)]
+ else:
+ raise
--- End diff --
Why we limit to just result columns not named with strings?
In the case we return a pd.DataFrame with matching field types, but not
matching field names, we don't like to allow it?
If returned pd.DataFrame doesn't match return_type's column names,
shouldn't we follow current behavior?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]