zhengruifeng commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465819924


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, 
psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial 
data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's 
memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to 
be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = 
f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+            tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"

Review Comment:
   I guess we need to handle index separate:
   1, the fallback pandas method itself may change the index (change the index 
column or change the values in the index);
   2, `df.groupby.apply` returns a psdf with a multi-index containing both 
original index and the group column, which is not needed in the fallback. e.g.
   
   ```
   In [11]: output_df.index
   Out[11]:
   MultiIndex([(0, '2000-01-01 00:00:00'),
               (0, '2000-01-01 00:01:00'),
               (0, '2000-01-01 00:02:00'),
               (0, '2000-01-01 00:03:00')],
              
names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__',
 None])
   
   In [12]: input_df
   Out[12]:
                          s  
__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__
   2000-01-01 00:00:00  0.0                                                     
                   0
   2000-01-01 00:01:00  NaN                                                     
                   0
   2000-01-01 00:02:00  2.0                                                     
                   0
   2000-01-01 00:03:00  3.0                                                     
                   0
   
   In [13]: input_df.index
   Out[13]:
   DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
                  '2000-01-01 00:02:00', '2000-01-01 00:03:00'],
                 dtype='datetime64[ns]', freq=None)
   
   In [14]: output_df = input_df.groupby(tmp_agg_column_name).apply(lambda df: 
df)
   /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: 
PandasAPIOnSparkAdviceWarning: If the type hints is not specified for 
`groupby.apply`, it is expensive to infer the data type internally.
     warnings.warn(message, PandasAPIOnSparkAdviceWarning)
   
   In [15]: output_df.index
   Out[15]:
   MultiIndex([(0, '2000-01-01 00:00:00'),
               (0, '2000-01-01 00:01:00'),
               (0, '2000-01-01 00:02:00'),
               (0, '2000-01-01 00:03:00')],
              
names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__',
 None])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to