HyukjinKwon commented on a change in pull request #34931:
URL: https://github.com/apache/spark/pull/34931#discussion_r774311846



##########
File path: python/pyspark/pandas/frame.py
##########
@@ -8828,22 +8847,147 @@ def describe(self, percentiles: Optional[List[float]] 
= None) -> "DataFrame":
         else:
             percentiles = [0.25, 0.5, 0.75]
 
-        formatted_perc = ["{:.0%}".format(p) for p in sorted(percentiles)]
-        stats = ["count", "mean", "stddev", "min", *formatted_perc, "max"]
+        # Identify the cases
+        is_all_string_type = (
+            len(psser_numeric) == 0 and len(psser_timestamp) == 0 and 
len(psser_string) > 0
+        )
+        is_all_numeric_type = len(psser_numeric) > 0 and len(psser_timestamp) 
== 0
+        has_timestamp_type = len(psser_timestamp) > 0
+        has_numeric_type = len(psser_numeric) > 0
+        has_null_columns = len(null_columns) > 0
+
+        if is_all_string_type:
+            # Handling string type columns
+            # We will retrive the `count`, `unique`, `top` and `freq`.
+            internal = self._internal.resolved_copy
+            exprs_string = [
+                internal.spark_column_for(psser._column_label) for psser in 
psser_string
+            ]
+            sdf = internal.spark_frame.select(*exprs_string)
+
+            # Get `count` & `unique` for each columns
+            counts, uniques = map(lambda x: x[1:], sdf.summary("count", 
"count_distinct").take(2))
+
+            # Get `top` & `freq` for each columns
+            tops = []
+            freqs = []
+            # TODO(SPARK-37711): We should do it in single pass since invoking 
Spark job
+            #  for every columns is too expensive.
+            for column in exprs_string:
+                top, freq = sdf.groupby(column).count().sort("count", 
ascending=False).first()
+                tops.append(str(top))
+                freqs.append(str(freq))
+
+            stats = [counts, uniques, tops, freqs]
+            stats_names = ["count", "unique", "top", "freq"]
+
+            result: DataFrame = DataFrame(
+                data=stats,
+                index=stats_names,
+                columns=column_names,
+            )
+        elif is_all_numeric_type:
+            # Handling numeric columns
+            exprs_numeric = [
+                psser._dtype_op.nan_to_null(psser).spark.column for psser in 
psser_numeric
+            ]
+            formatted_perc = ["{:.0%}".format(p) for p in sorted(percentiles)]
+            stats = ["count", "mean", "stddev", "min", *formatted_perc, "max"]
 
-        sdf = self._internal.spark_frame.select(*exprs).summary(*stats)
-        sdf = sdf.replace("stddev", "std", subset=["summary"])
+            # In this case, we can simply use `summary` to calculate the stats.
+            sdf = 
self._internal.spark_frame.select(*exprs_numeric).summary(*stats)
+            sdf = sdf.replace("stddev", "std", subset=["summary"])
 
-        internal = InternalFrame(
-            spark_frame=sdf,
-            index_spark_columns=[scol_for(sdf, "summary")],
-            column_labels=column_labels,
-            data_spark_columns=[
-                scol_for(sdf, self._internal.spark_column_name_for(label))
-                for label in column_labels
-            ],
-        )
-        return DataFrame(internal).astype("float64")
+            internal = InternalFrame(
+                spark_frame=sdf,
+                index_spark_columns=[scol_for(sdf, "summary")],
+                column_labels=column_labels,
+                data_spark_columns=[
+                    scol_for(sdf, self._internal.spark_column_name_for(label))
+                    for label in column_labels
+                ],
+            )
+            result = DataFrame(internal).astype("float64")
+        elif has_timestamp_type:
+            internal = self._internal.resolved_copy
+            column_names = [
+                internal.spark_column_name_for(column_label) for column_label 
in column_labels
+            ]
+            column_length = len(column_labels)
+
+            # Apply stat functions for each column.
+            count_exprs = map(F.count, column_names)
+            min_exprs = map(F.min, column_names)
+            # Here we try to flat the multiple map into single list that 
contains each calculated
+            # percentile using `chain`.
+            # e.g. flat the `[<map object at 0x7fc1907dc280>, <map object at 
0x7fc1907dcc70>]`
+            # to `[Column<'percentile_approx(A, 0.2, 10000)'>, 
Column<'percentile_approx(B, 0.2, 10000)'>,
+            # Column<'percentile_approx(A, 0.5, 10000)'>, 
Column<'percentile_approx(B, 0.5, 10000)'>]`
+            perc_exprs = chain(
+                *[
+                    map(F.percentile_approx, column_names, [percentile] * 
column_length)
+                    for percentile in percentiles
+                ]
+            )
+            max_exprs = map(F.max, column_names)
+            mean_exprs = []
+            for column_name, spark_data_type in zip(column_names, 
spark_data_types):
+                mean_exprs.append(F.mean(column_name).astype(spark_data_type))
+            exprs = [*count_exprs, *mean_exprs, *min_exprs, *perc_exprs, 
*max_exprs]
+
+            formatted_perc = ["{:.0%}".format(p) for p in sorted(percentiles)]
+            stats_names = ["count", "mean", "min", *formatted_perc, "max"]
+
+            # If not all columns are timestamp type,
+            # we also need to calculate the `std` for numeric columns
+            if has_numeric_type:
+                std_exprs = []
+                for label, spark_data_type in zip(column_labels, 
spark_data_types):
+                    column_name = label[0]
+                    if isinstance(spark_data_type, (TimestampType, 
TimestampNTZType)):
+                        std_exprs.append(
+                            
F.lit(str(pd.NaT)).alias("stddev_samp({})".format(column_name))
+                        )
+                    else:
+                        std_exprs.append(F.stddev(column_name))
+                exprs.extend(std_exprs)
+                stats_names.append("std")
+
+            # Select stats for all columns at once.
+            sdf = internal.spark_frame.select(exprs)
+            stat_values = sdf.first()
+
+            num_stats = int(len(exprs) / column_length)
+            # `column_name_stats_kv` is key-value store that has column name 
as key, and the stats as values
+            # e.g. {"A": [{count_value}, {min_value}, ...], "B": 
[{count_value}, {min_value} ...]}
+            column_name_stats_kv: Dict[str, List[str]] = defaultdict(list)
+            for i, column_name in enumerate(column_names):
+                for first_stat_idx in range(num_stats):
+                    column_name_stats_kv[column_name].append(
+                        stat_values[(first_stat_idx * column_length) + i]
+                    )
+
+            # For timestamp type columns, we should cast the column type to 
string.
+            for key, spark_data_type in zip(column_name_stats_kv, 
spark_data_types):
+                if isinstance(spark_data_type, (TimestampType, 
TimestampNTZType)):
+                    column_name_stats_kv[key] = [str(value) for value in 
column_name_stats_kv[key]]
+
+            result: DataFrame = DataFrame(  # type: ignore[no-redef]
+                data=column_name_stats_kv,
+                index=stats_names,
+                columns=column_names,
+            )
+        elif has_null_columns:

Review comment:
       How does it work if other columns have explicit `None`? e.g.)
   
   ```
   >>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': [pd.Timestamp(1), 
pd.Timestamp(1), pd.Timestamp(1)], 'c': [None, None, None]})
   >>> pdf.dtypes
   a             int64
   b    datetime64[ns]
   c            object
   dtype: object
   ```

##########
File path: python/pyspark/pandas/frame.py
##########
@@ -8828,22 +8847,147 @@ def describe(self, percentiles: Optional[List[float]] 
= None) -> "DataFrame":
         else:
             percentiles = [0.25, 0.5, 0.75]
 
-        formatted_perc = ["{:.0%}".format(p) for p in sorted(percentiles)]
-        stats = ["count", "mean", "stddev", "min", *formatted_perc, "max"]
+        # Identify the cases
+        is_all_string_type = (
+            len(psser_numeric) == 0 and len(psser_timestamp) == 0 and 
len(psser_string) > 0
+        )
+        is_all_numeric_type = len(psser_numeric) > 0 and len(psser_timestamp) 
== 0
+        has_timestamp_type = len(psser_timestamp) > 0
+        has_numeric_type = len(psser_numeric) > 0
+        has_null_columns = len(null_columns) > 0
+
+        if is_all_string_type:
+            # Handling string type columns
+            # We will retrive the `count`, `unique`, `top` and `freq`.
+            internal = self._internal.resolved_copy
+            exprs_string = [
+                internal.spark_column_for(psser._column_label) for psser in 
psser_string
+            ]
+            sdf = internal.spark_frame.select(*exprs_string)
+
+            # Get `count` & `unique` for each columns
+            counts, uniques = map(lambda x: x[1:], sdf.summary("count", 
"count_distinct").take(2))
+
+            # Get `top` & `freq` for each columns
+            tops = []
+            freqs = []
+            # TODO(SPARK-37711): We should do it in single pass since invoking 
Spark job
+            #  for every columns is too expensive.
+            for column in exprs_string:
+                top, freq = sdf.groupby(column).count().sort("count", 
ascending=False).first()
+                tops.append(str(top))
+                freqs.append(str(freq))
+
+            stats = [counts, uniques, tops, freqs]
+            stats_names = ["count", "unique", "top", "freq"]
+
+            result: DataFrame = DataFrame(
+                data=stats,
+                index=stats_names,
+                columns=column_names,
+            )
+        elif is_all_numeric_type:
+            # Handling numeric columns
+            exprs_numeric = [
+                psser._dtype_op.nan_to_null(psser).spark.column for psser in 
psser_numeric
+            ]
+            formatted_perc = ["{:.0%}".format(p) for p in sorted(percentiles)]
+            stats = ["count", "mean", "stddev", "min", *formatted_perc, "max"]
 
-        sdf = self._internal.spark_frame.select(*exprs).summary(*stats)
-        sdf = sdf.replace("stddev", "std", subset=["summary"])
+            # In this case, we can simply use `summary` to calculate the stats.
+            sdf = 
self._internal.spark_frame.select(*exprs_numeric).summary(*stats)
+            sdf = sdf.replace("stddev", "std", subset=["summary"])
 
-        internal = InternalFrame(
-            spark_frame=sdf,
-            index_spark_columns=[scol_for(sdf, "summary")],
-            column_labels=column_labels,
-            data_spark_columns=[
-                scol_for(sdf, self._internal.spark_column_name_for(label))
-                for label in column_labels
-            ],
-        )
-        return DataFrame(internal).astype("float64")
+            internal = InternalFrame(
+                spark_frame=sdf,
+                index_spark_columns=[scol_for(sdf, "summary")],
+                column_labels=column_labels,
+                data_spark_columns=[
+                    scol_for(sdf, self._internal.spark_column_name_for(label))
+                    for label in column_labels
+                ],
+            )
+            result = DataFrame(internal).astype("float64")
+        elif has_timestamp_type:
+            internal = self._internal.resolved_copy
+            column_names = [
+                internal.spark_column_name_for(column_label) for column_label 
in column_labels
+            ]
+            column_length = len(column_labels)
+
+            # Apply stat functions for each column.
+            count_exprs = map(F.count, column_names)
+            min_exprs = map(F.min, column_names)
+            # Here we try to flat the multiple map into single list that 
contains each calculated
+            # percentile using `chain`.
+            # e.g. flat the `[<map object at 0x7fc1907dc280>, <map object at 
0x7fc1907dcc70>]`
+            # to `[Column<'percentile_approx(A, 0.2, 10000)'>, 
Column<'percentile_approx(B, 0.2, 10000)'>,
+            # Column<'percentile_approx(A, 0.5, 10000)'>, 
Column<'percentile_approx(B, 0.5, 10000)'>]`
+            perc_exprs = chain(
+                *[
+                    map(F.percentile_approx, column_names, [percentile] * 
column_length)
+                    for percentile in percentiles
+                ]
+            )
+            max_exprs = map(F.max, column_names)
+            mean_exprs = []
+            for column_name, spark_data_type in zip(column_names, 
spark_data_types):
+                mean_exprs.append(F.mean(column_name).astype(spark_data_type))
+            exprs = [*count_exprs, *mean_exprs, *min_exprs, *perc_exprs, 
*max_exprs]
+
+            formatted_perc = ["{:.0%}".format(p) for p in sorted(percentiles)]
+            stats_names = ["count", "mean", "min", *formatted_perc, "max"]
+
+            # If not all columns are timestamp type,
+            # we also need to calculate the `std` for numeric columns
+            if has_numeric_type:
+                std_exprs = []
+                for label, spark_data_type in zip(column_labels, 
spark_data_types):
+                    column_name = label[0]
+                    if isinstance(spark_data_type, (TimestampType, 
TimestampNTZType)):
+                        std_exprs.append(
+                            
F.lit(str(pd.NaT)).alias("stddev_samp({})".format(column_name))
+                        )
+                    else:
+                        std_exprs.append(F.stddev(column_name))
+                exprs.extend(std_exprs)
+                stats_names.append("std")
+
+            # Select stats for all columns at once.
+            sdf = internal.spark_frame.select(exprs)
+            stat_values = sdf.first()
+
+            num_stats = int(len(exprs) / column_length)
+            # `column_name_stats_kv` is key-value store that has column name 
as key, and the stats as values
+            # e.g. {"A": [{count_value}, {min_value}, ...], "B": 
[{count_value}, {min_value} ...]}
+            column_name_stats_kv: Dict[str, List[str]] = defaultdict(list)
+            for i, column_name in enumerate(column_names):
+                for first_stat_idx in range(num_stats):
+                    column_name_stats_kv[column_name].append(
+                        stat_values[(first_stat_idx * column_length) + i]
+                    )
+
+            # For timestamp type columns, we should cast the column type to 
string.
+            for key, spark_data_type in zip(column_name_stats_kv, 
spark_data_types):
+                if isinstance(spark_data_type, (TimestampType, 
TimestampNTZType)):
+                    column_name_stats_kv[key] = [str(value) for value in 
column_name_stats_kv[key]]
+
+            result: DataFrame = DataFrame(  # type: ignore[no-redef]
+                data=column_name_stats_kv,
+                index=stats_names,
+                columns=column_names,
+            )
+        elif has_null_columns:

Review comment:
       How does it work if other columns have explicit `None`? e.g.)
   
   ```python
   >>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': [pd.Timestamp(1), 
pd.Timestamp(1), pd.Timestamp(1)], 'c': [None, None, None]})
   >>> pdf.dtypes
   a             int64
   b    datetime64[ns]
   c            object
   dtype: object
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to