holdenk commented on code in PR #54370:
URL: https://github.com/apache/spark/pull/54370#discussion_r2824794619


##########
python/pyspark/pandas/frame.py:
##########


Review Comment:
   Reason for dropping the coment?



##########
python/pyspark/pandas/frame.py:
##########
@@ -9899,33 +9848,39 @@ def describe(self, percentiles: Optional[List[float]] = 
None) -> "DataFrame":
         has_numeric_type = len(psser_numeric) > 0
 
         if is_all_string_type:
-            # Handling string type columns
-            # We will retrieve the `count`, `unique`, `top` and `freq`.
             internal = self._internal.resolved_copy
             exprs_string = [
                 internal.spark_column_for(psser._column_label) for psser in 
psser_string
             ]
             sdf = internal.spark_frame.select(*exprs_string)
 
-            # Get `count` & `unique` for each columns
             counts, uniques = map(lambda x: x[1:], sdf.summary("count", 
"count_distinct").take(2))
-            # Handling Empty DataFrame
             if len(counts) == 0 or counts[0] == "0":
                 data = dict()
                 for psser in psser_string:
                     data[psser.name] = [0, 0, np.nan, np.nan]
                 return DataFrame(data, index=["count", "unique", "top", 
"freq"])
 
-            # Get `top` & `freq` for each columns
-            tops = []
-            freqs = []
-            # TODO(SPARK-37711): We should do it in single pass since invoking 
Spark job
-            #   for every columns is too expensive.
-            for column in exprs_string:
-                top, freq = sdf.groupby(column).count().sort("count", 
ascending=False).first()
-                tops.append(str(top))
-                freqs.append(str(freq))
-
+            n_cols = len(column_names)
+            stack_args = ", ".join([f"'{col_name}', `{col_name}`" for col_name 
in column_names])
+            stack_expr = f"stack({n_cols}, {stack_args}) as (column_name, 
value)"
+            # Unpivot, group by (column_name, value), and count occurrences
+            unpivoted = sdf.selectExpr(stack_expr)
+            value_counts = unpivoted.groupBy("column_name", "value").count()
+            # Use window function to rank values by count within each column
+            # When counts tie, pick the first value alphabetically like pandas
+            window = 
Window.partitionBy("column_name").orderBy(F.desc("count"), F.asc("value"))
+            # Unfortunately, there's no straightforward way to get the top 
value and its frequency
+            # for each column without collecting the data to the driver side.

Review Comment:
   Note for the future: This seems like a good follow up issue, I think we 
could do something smarter here long term. I've been thinking about some kind 
of bounded collection types for aggregations and this might fit. (although tbf 
describe isn't used all that often, but would love to put these together if we 
can). They do still end up being large but on the executors and the final 
driver part is a bit smaller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to