gaogaotiantian commented on code in PR #54370:
URL: https://github.com/apache/spark/pull/54370#discussion_r2825277759


##########
python/pyspark/pandas/frame.py:
##########
@@ -9899,33 +9848,39 @@ def describe(self, percentiles: Optional[List[float]] = 
None) -> "DataFrame":
         has_numeric_type = len(psser_numeric) > 0
 
         if is_all_string_type:
-            # Handling string type columns
-            # We will retrieve the `count`, `unique`, `top` and `freq`.
             internal = self._internal.resolved_copy
             exprs_string = [
                 internal.spark_column_for(psser._column_label) for psser in 
psser_string
             ]
             sdf = internal.spark_frame.select(*exprs_string)
 
-            # Get `count` & `unique` for each columns
             counts, uniques = map(lambda x: x[1:], sdf.summary("count", 
"count_distinct").take(2))
-            # Handling Empty DataFrame
             if len(counts) == 0 or counts[0] == "0":
                 data = dict()
                 for psser in psser_string:
                     data[psser.name] = [0, 0, np.nan, np.nan]
                 return DataFrame(data, index=["count", "unique", "top", 
"freq"])
 
-            # Get `top` & `freq` for each columns
-            tops = []
-            freqs = []
-            # TODO(SPARK-37711): We should do it in single pass since invoking 
Spark job
-            #   for every columns is too expensive.
-            for column in exprs_string:
-                top, freq = sdf.groupby(column).count().sort("count", 
ascending=False).first()
-                tops.append(str(top))
-                freqs.append(str(freq))
-
+            n_cols = len(column_names)
+            stack_args = ", ".join([f"'{col_name}', `{col_name}`" for col_name 
in column_names])
+            stack_expr = f"stack({n_cols}, {stack_args}) as (column_name, 
value)"
+            # Unpivot, group by (column_name, value), and count occurrences
+            unpivoted = sdf.selectExpr(stack_expr)
+            value_counts = unpivoted.groupBy("column_name", "value").count()
+            # Use window function to rank values by count within each column
+            # When counts tie, pick the first value alphabetically like pandas
+            window = 
Window.partitionBy("column_name").orderBy(F.desc("count"), F.asc("value"))
+            # Unfortunately, there's no straightforward way to get the top 
value and its frequency
+            # for each column without collecting the data to the driver side.
+            top_values = (
+                value_counts.withColumn("rank", F.row_number().over(window))
+                .filter(F.col("rank") == 1)
+                .select("column_name", "value", F.col("count").alias("freq"))
+                .collect()
+            )
+            top_freq_dict = {row.column_name: (row.value, row.freq) for row in 
top_values}
+            tops = [str(top_freq_dict[col_name][0]) for col_name in 
column_names]
+            freqs = [str(top_freq_dict[col_name][1]) for col_name in 
column_names]

Review Comment:
   ```python
   top_freq_dict = {row.column_name: (str(row.value), str(row.freq)) for row in 
top_values}
   tops, freqs = map(list, zip(*(top_freq_dict[col_name] for col_name in 
column_names)))
   ```
   
   Maybe less duplicate? Not a huge thing though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to