Github user holdenk commented on a diff in the pull request:
https://github.com/apache/spark/pull/20678#discussion_r171111186
--- Diff: python/pyspark/sql/dataframe.py ---
@@ -1986,55 +1986,89 @@ def toPandas(self):
timezone = None
if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled",
"false").lower() == "true":
+ should_fallback = False
try:
- from pyspark.sql.types import
_check_dataframe_convert_date, \
- _check_dataframe_localize_timestamps, to_arrow_schema
+ from pyspark.sql.types import to_arrow_schema
from pyspark.sql.utils import
require_minimum_pyarrow_version
+
require_minimum_pyarrow_version()
- import pyarrow
to_arrow_schema(self.schema)
- tables = self._collectAsArrow()
- if tables:
- table = pyarrow.concat_tables(tables)
- pdf = table.to_pandas()
- pdf = _check_dataframe_convert_date(pdf, self.schema)
- return _check_dataframe_localize_timestamps(pdf,
timezone)
- else:
- return pd.DataFrame.from_records([],
columns=self.columns)
except Exception as e:
- msg = (
- "Note: toPandas attempted Arrow optimization because "
- "'spark.sql.execution.arrow.enabled' is set to true.
Please set it to false "
- "to disable this.")
- raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
- else:
- pdf = pd.DataFrame.from_records(self.collect(),
columns=self.columns)
- dtype = {}
+ if
self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
+ .lower() == "true":
+ msg = (
+ "toPandas attempted Arrow optimization because "
+ "'spark.sql.execution.arrow.enabled' is set to
true; however, "
+ "failed by the reason below:\n %s\n"
+ "Attempts non-optimization as "
+ "'spark.sql.execution.arrow.fallback.enabled' is
set to "
+ "true." % _exception_message(e))
+ warnings.warn(msg)
+ should_fallback = True
+ else:
+ msg = (
+ "toPandas attempted Arrow optimization because "
+ "'spark.sql.execution.arrow.enabled' is set to
true; however, "
+ "failed by the reason below:\n %s\n"
+ "For fallback to non-optimization automatically,
please set true to "
+ "'spark.sql.execution.arrow.fallback.enabled'." %
_exception_message(e))
+ raise RuntimeError(msg)
+
+ if not should_fallback:
+ try:
+ from pyspark.sql.types import
_check_dataframe_convert_date, \
+ _check_dataframe_localize_timestamps
+ import pyarrow
+
+ tables = self._collectAsArrow()
+ if tables:
+ table = pyarrow.concat_tables(tables)
+ pdf = table.to_pandas()
+ pdf = _check_dataframe_convert_date(pdf,
self.schema)
+ return _check_dataframe_localize_timestamps(pdf,
timezone)
+ else:
+ return pd.DataFrame.from_records([],
columns=self.columns)
+ except Exception as e:
+ # We might have to allow fallback here as well but
multiple Spark jobs can
+ # be executed. So, simply fail in this case for now.
+ msg = (
+ "toPandas attempted Arrow optimization because "
+ "'spark.sql.execution.arrow.enabled' is set to
true; however, "
+ "failed unexpectedly:\n %s\n"
+ "Note that
'spark.sql.execution.arrow.fallback.enabled' does "
--- End diff --
+1 good job having this explanation in the exception
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]