xinrong-meng commented on code in PR #47897:
URL: https://github.com/apache/spark/pull/47897#discussion_r1733770464
##########
python/pyspark/pandas/plot/core.py:
##########
@@ -285,209 +285,76 @@ def binary_search_for_buckets(value):
class BoxPlotBase:
@staticmethod
- def compute_multicol_stats(data, colnames, whis, precision):
- # Computes mean, median, Q1 and Q3 with approx_percentile and precision
- scol = []
- for colname in colnames:
- scol.append(
- F.percentile_approx(
- "`%s`" % colname, [0.25, 0.50, 0.75], int(1.0 / precision)
- ).alias("{}_percentiles%".format(colname))
- )
- scol.append(F.mean("`%s`" %
colname).alias("{}_mean".format(colname)))
-
- # a_percentiles a_mean b_percentiles b_mean
- # 0 [3.0, 3.2, 3.2] 3.18 [5.1, 5.9, 6.4] 5.86
- pdf = data._internal.resolved_copy.spark_frame.select(*scol).toPandas()
-
- i = 0
- multicol_stats = {}
- for colname in colnames:
- q1, med, q3 = pdf.iloc[0, i]
+ def compute_box(sdf, colnames, whis, precision, showfliers):
+ assert len(colnames) > 0
+ formatted_colnames = ["`{}`".format(colname) for colname in colnames]
+
+ stats_scols = []
+ for i, colname in enumerate(formatted_colnames):
+ percentiles = F.percentile_approx(colname, [0.25, 0.50, 0.75],
int(1.0 / precision))
+ q1 = F.get(percentiles, 0)
+ med = F.get(percentiles, 1)
+ q3 = F.get(percentiles, 2)
iqr = q3 - q1
- lfence = q1 - whis * iqr
- ufence = q3 + whis * iqr
- i += 1
-
- mean = pdf.iloc[0, i]
- i += 1
-
- multicol_stats[colname] = {
- "mean": mean,
- "med": med,
- "q1": q1,
- "q3": q3,
- "lfence": lfence,
- "ufence": ufence,
- }
-
- return multicol_stats
-
- @staticmethod
- def compute_stats(data, colname, whis, precision):
- # Computes mean, median, Q1 and Q3 with approx_percentile and precision
- pdf = data._psdf._internal.resolved_copy.spark_frame.agg(
- *[
- F.expr(
- "approx_percentile(`{}`, {}, {})".format(colname, q,
int(1.0 / precision))
- ).alias("{}_{}%".format(colname, int(q * 100)))
- for q in [0.25, 0.50, 0.75]
- ],
- F.mean("`%s`" % colname).alias("{}_mean".format(colname)),
- ).toPandas()
-
- # Computes IQR and Tukey's fences
- iqr = "{}_iqr".format(colname)
- p75 = "{}_75%".format(colname)
- p25 = "{}_25%".format(colname)
- pdf.loc[:, iqr] = pdf.loc[:, p75] - pdf.loc[:, p25]
- pdf.loc[:, "{}_lfence".format(colname)] = pdf.loc[:, p25] - whis *
pdf.loc[:, iqr]
- pdf.loc[:, "{}_ufence".format(colname)] = pdf.loc[:, p75] + whis *
pdf.loc[:, iqr]
-
- qnames = ["25%", "50%", "75%", "mean", "lfence", "ufence"]
- col_summ = pdf[["{}_{}".format(colname, q) for q in qnames]]
- col_summ.columns = qnames
- lfence, ufence = col_summ["lfence"], col_summ["ufence"]
-
- stats = {
- "mean": col_summ["mean"].values[0],
- "med": col_summ["50%"].values[0],
- "q1": col_summ["25%"].values[0],
- "q3": col_summ["75%"].values[0],
- }
-
- return stats, (lfence.values[0], ufence.values[0])
-
- @staticmethod
- def multicol_outliers(data, multicol_stats):
- scols = {}
- for colname, stats in multicol_stats.items():
- scols["__{}_outlier".format(colname)] = ~F.col("`%s`" %
colname).between(
- stats["lfence"], stats["ufence"]
- )
- return data._internal.resolved_copy.spark_frame.withColumns(scols)
-
- @staticmethod
- def outliers(data, colname, lfence, ufence):
- # Builds expression to identify outliers
- expression = F.col("`%s`" % colname).between(lfence, ufence)
- # Creates a column to flag rows as outliers or not
- return data._psdf._internal.resolved_copy.spark_frame.withColumn(
- "__{}_outlier".format(colname), ~expression
- )
-
- @staticmethod
- def calc_multicol_whiskers(colnames, multicol_outliers):
- # Computes min and max values of non-outliers - the whiskers
- scols = []
- for colname in colnames:
- outlier_colname = "__{}_outlier".format(colname)
- scols.append(
- F.min(F.when(~F.col(outlier_colname),
F.col(colname)).otherwise(F.lit(None))).alias(
- "__{}_min".format(colname)
- )
- )
- scols.append(
- F.max(F.when(~F.col(outlier_colname),
F.col(colname)).otherwise(F.lit(None))).alias(
- "__{}_max".format(colname)
- )
- )
-
- pdf = multicol_outliers.select(*scols).toPandas()
-
- i = 0
- whiskers = {}
- for colname in colnames:
- min = pdf.iloc[0, i]
- i += 1
- max = pdf.iloc[0, i]
- i += 1
- whiskers[colname] = {
- "min": min,
- "max": max,
- }
-
- return whiskers
-
- @staticmethod
- def calc_whiskers(colname, outliers):
- # Computes min and max values of non-outliers - the whiskers
- minmax = (
- outliers.filter("not `__{}_outlier`".format(colname))
- .agg(F.min("`%s`" % colname).alias("min"),
F.max(colname).alias("max"))
- .toPandas()
- )
- return minmax.iloc[0][["min", "max"]].values
-
- @staticmethod
- def get_fliers(colname, outliers, lfence, ufence):
- # Filters only the outliers, should "showfliers" be True
- fliers_df = outliers.filter("`__{}_outlier`".format(colname))
-
- # If it shows fliers, take the top 1k with highest absolute values
- # Here we normalize the values by subtracting the fences.
- formated_colname = "`{}`".format(colname)
- order_col = (
- F.when(
- F.col(formated_colname) > F.lit(ufence),
- F.col(formated_colname) - F.lit(ufence),
- )
- .when(
- F.col(formated_colname) < F.lit(lfence),
- F.lit(lfence) - F.col(formated_colname),
- )
- .otherwise(F.lit(None))
- )
- fliers = (
- fliers_df.select(F.col("`{}`".format(colname)))
- .orderBy(order_col)
- .limit(1001)
- .toPandas()[colname]
- .values
- )
-
- return fliers
-
- @staticmethod
- def get_multicol_fliers(colnames, multicol_outliers, multicol_stats):
- scols = []
- for i, colname in enumerate(colnames):
- formated_colname = "`{}`".format(colname)
- outlier_colname = "__{}_outlier".format(colname)
- lfence, ufence = multicol_stats[colname]["lfence"],
multicol_stats[colname]["ufence"]
- order_col = (
- F.when(
- F.col(formated_colname) > F.lit(ufence),
- F.col(formated_colname) - F.lit(ufence),
- )
- .when(
- F.col(formated_colname) < F.lit(lfence),
- F.lit(lfence) - F.col(formated_colname),
- )
- .otherwise(F.lit(None))
+ lfence = q1 - F.lit(whis) * iqr
+ ufence = q3 + F.lit(whis) * iqr
+
+ stats_scols.append(
+ F.struct(
+ F.mean(colname).alias("mean"),
+ med.alias("med"),
+ q1.alias("q1"),
+ q3.alias("q3"),
+ lfence.alias("lfence"),
+ ufence.alias("ufence"),
+ ).alias(f"_box_plot_stats_{i}")
)
- pair_col = F.struct(
- order_col.alias("ord"),
- F.col(formated_colname).alias("val"),
- )
- scols.append(
- SF.collect_top_k(
- F.when(F.col(outlier_colname), pair_col)
- .otherwise(F.lit(None))
- .alias(f"pair_{i}"),
- 1001,
- False,
- ).alias(f"top_{i}")["val"]
+ sdf_stats = sdf.select(*stats_scols)
+
+ result_scols = []
+ for i, colname in enumerate(formatted_colnames):
+ value = F.col(colname)
+
+ lfence = F.col(f"_box_plot_stats_{i}.lfence")
+ ufence = F.col(f"_box_plot_stats_{i}.ufence")
+ mean = F.col(f"_box_plot_stats_{i}.mean")
+ med = F.col(f"_box_plot_stats_{i}.med")
+ q1 = F.col(f"_box_plot_stats_{i}.q1")
+ q3 = F.col(f"_box_plot_stats_{i}.q3")
+
+ outlier = ~value.between(lfence, ufence)
+
+ # Computes min and max values of non-outliers - the whiskers
+ upper_whisker = F.max(F.when(~outlier,
value).otherwise(F.lit(None)))
+ lower_whisker = F.min(F.when(~outlier,
value).otherwise(F.lit(None)))
+
+ # If it shows fliers, take the top 1k with the highest absolute
values
+ # Here we normalize the values by subtracting the median.
+ if showfliers:
+ pair = F.when(
+ outlier,
+ F.struct(F.abs(value - med), value.alias("val")),
+ ).otherwise(F.lit(None))
+ topk = SF.collect_top_k(pair, 1001, False)
+ fliers = F.when(F.size(topk) > 0,
topk["val"]).otherwise(F.lit(None))
+ else:
+ fliers = F.lit(None)
+
+ result_scols.append(
+ F.struct(
+ F.first(mean).alias("mean"),
+ F.first(med).alias("med"),
+ F.first(q1).alias("q1"),
+ F.first(q3).alias("q3"),
+ upper_whisker.alias("upper_whisker"),
+ lower_whisker.alias("lower_whisker"),
+ fliers.alias("fliers"),
+ ).alias(f"_box_plot_results_{i}")
)
- results = multicol_outliers.select(scols).first()
-
- fliers = {}
- for i, colname in enumerate(colnames):
- fliers[colname] = results[i]
-
- return fliers
+ sdf_result =
sdf.join(sdf_stats.hint("broadcast")).select(*result_scols)
Review Comment:
Nice!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]