Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/20678#discussion_r172087998
--- Diff: python/pyspark/sql/dataframe.py ---
@@ -1986,55 +1986,91 @@ def toPandas(self):
timezone = None
if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled",
"false").lower() == "true":
+ use_arrow = True
try:
- from pyspark.sql.types import
_check_dataframe_convert_date, \
- _check_dataframe_localize_timestamps, to_arrow_schema
+ from pyspark.sql.types import to_arrow_schema
from pyspark.sql.utils import
require_minimum_pyarrow_version
+
require_minimum_pyarrow_version()
- import pyarrow
to_arrow_schema(self.schema)
- tables = self._collectAsArrow()
- if tables:
- table = pyarrow.concat_tables(tables)
- pdf = table.to_pandas()
- pdf = _check_dataframe_convert_date(pdf, self.schema)
- return _check_dataframe_localize_timestamps(pdf,
timezone)
- else:
- return pd.DataFrame.from_records([],
columns=self.columns)
except Exception as e:
- msg = (
- "Note: toPandas attempted Arrow optimization because "
- "'spark.sql.execution.arrow.enabled' is set to true.
Please set it to false "
- "to disable this.")
- raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
- else:
- pdf = pd.DataFrame.from_records(self.collect(),
columns=self.columns)
- dtype = {}
+ if
self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
+ .lower() == "true":
+ msg = (
+ "toPandas attempted Arrow optimization because "
+ "'spark.sql.execution.arrow.enabled' is set to
true; however, "
+ "failed by the reason below:\n %s\n"
+ "Attempts non-optimization as "
+ "'spark.sql.execution.arrow.fallback.enabled' is
set to "
+ "true." % _exception_message(e))
+ warnings.warn(msg)
+ use_arrow = False
+ else:
+ msg = (
+ "toPandas attempted Arrow optimization because "
+ "'spark.sql.execution.arrow.enabled' is set to
true; however, "
+ "failed by the reason below:\n %s\n"
+ "For fallback to non-optimization automatically,
please set true to "
+ "'spark.sql.execution.arrow.fallback.enabled'." %
_exception_message(e))
+ raise RuntimeError(msg)
+
+ # Try to use Arrow optimization when the schema is supported
and the required version
+ # of PyArrow is found, if
'spark.sql.execution.arrow.fallback.enabled' is enabled.
--- End diff --
`spark.sql.execution.arrow.enabled` instead of
`spark.sql.execution.arrow.fallback.enabled`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]