zhengruifeng commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465819924
##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int,
psser_or_psdf: DataFrameOrSeries) -> "Da
return psdf
+ def _fall_back_frame(self, method: str) -> Callable:
+ def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+ log_advice(
+ f"`{method}` is executed in fallback mode. It loads partial
data into the driver's memory"
+ f" to infer the schema, and loads all data into one executor's
memory to compute. "
+ "It should only be used if the pandas DataFrame is expected to
be small."
+ )
+ input_df = self.copy()
+
+ uid = str(uuid.uuid4()).replace("-", "")
+ tmp_agg_column_name =
f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+ tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"
Review Comment:
I guess we need to handle index separate:
1, the fallback pandas method itself may change the index (change the index
column or change the values in the index);
2, `df.groupby.apply` returns a psdf with a multi-index containing both
original index and the group column, which is not needed in the fallback. e.g.
```
In [11]: output_df.index
Out[11]:
MultiIndex([(0, '2000-01-01 00:00:00'),
(0, '2000-01-01 00:01:00'),
(0, '2000-01-01 00:02:00'),
(0, '2000-01-01 00:03:00')],
names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__',
None])
In [12]: input_df
Out[12]:
s
__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__
2000-01-01 00:00:00 0.0
0
2000-01-01 00:01:00 NaN
0
2000-01-01 00:02:00 2.0
0
2000-01-01 00:03:00 3.0
0
In [13]: input_df.index
Out[13]:
DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
'2000-01-01 00:02:00', '2000-01-01 00:03:00'],
dtype='datetime64[ns]', freq=None)
In [14]: output_df = input_df.groupby(tmp_agg_column_name).apply(lambda df:
df)
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015:
PandasAPIOnSparkAdviceWarning: If the type hints is not specified for
`groupby.apply`, it is expensive to infer the data type internally.
warnings.warn(message, PandasAPIOnSparkAdviceWarning)
In [15]: output_df.index
Out[15]:
MultiIndex([(0, '2000-01-01 00:00:00'),
(0, '2000-01-01 00:01:00'),
(0, '2000-01-01 00:02:00'),
(0, '2000-01-01 00:03:00')],
names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__',
None])
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]