itholic commented on code in PR #37845:
URL: https://github.com/apache/spark/pull/37845#discussion_r969079834
##########
python/pyspark/pandas/tests/test_stats.py:
##########
@@ -257,6 +257,32 @@ def test_skew_kurt_numerical_stability(self):
self.assert_eq(psdf.skew(), pdf.skew(), almost=True)
self.assert_eq(psdf.kurt(), pdf.kurt(), almost=True)
+ def test_dataframe_corr(self):
+ pdf = makeMissingDataframe(0.3, 42)
+ psdf = ps.from_pandas(pdf)
+
+ with self.assertRaisesRegex(ValueError, "Invalid method"):
+ psdf.corr("std")
+ with self.assertRaisesRegex(NotImplementedError, "kendall for now"):
+ psdf.corr("kendall")
+ with self.assertRaisesRegex(TypeError, "Invalid min_periods type"):
+ psdf.corr(min_periods="3")
+ with self.assertRaisesRegex(NotImplementedError, "spearman for now"):
+ psdf.corr(method="spearman", min_periods=3)
+
+ self.assert_eq(psdf.corr(), pdf.corr(), check_exact=False)
+ self.assert_eq(psdf.corr(min_periods=1), pdf.corr(min_periods=1),
check_exact=False)
+ self.assert_eq(psdf.corr(min_periods=3), pdf.corr(min_periods=3),
check_exact=False)
+
+ # multi-index columns
+ columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B"), ("Y",
"C"), ("Z", "D")])
+ pdf.columns = columns
+ psdf.columns = columns
+
+ self.assert_eq(psdf.corr(), pdf.corr(), check_exact=False)
+ self.assert_eq(psdf.corr(min_periods=1), pdf.corr(min_periods=1),
check_exact=False)
+ self.assert_eq(psdf.corr(min_periods=3), pdf.corr(min_periods=3),
check_exact=False)
Review Comment:
Can we also test for chained operations?
e.g.
```python
self.assert_eq((psdf + 1).corr(), (pdf + 1).corr(), check_exact=False)
```
##########
python/pyspark/pandas/frame.py:
##########
@@ -1454,11 +1462,196 @@ def corr(self, method: str = "pearson") -> "DataFrame":
There are behavior differences between pandas-on-Spark and pandas.
* the `method` argument only accepts 'pearson', 'spearman'
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
- * pandas-on-Spark doesn't support the following argument(s).
+ * if the `method` is `spearman`, the data should not contain NaNs.
+ * if the `method` is `spearman`, `min_periods` argument is not
supported.
+ """
+ if method not in ["pearson", "spearman", "kendall"]:
+ raise ValueError(f"Invalid method {method}")
+ if method == "kendall":
+ raise NotImplementedError("method doesn't support kendall for now")
+ if min_periods is not None and not isinstance(min_periods, int):
+ raise TypeError(f"Invalid min_periods type
{type(min_periods).__name__}")
+ if min_periods is not None and method == "spearman":
+ raise NotImplementedError("min_periods doesn't support spearman
for now")
+
+ if method == "pearson":
+ min_periods = 1 if min_periods is None else min_periods
+ internal = self._internal.resolved_copy
+ numeric_labels = [
+ label
+ for label in internal.column_labels
+ if isinstance(internal.spark_type_for(label), (NumericType,
BooleanType))
+ ]
+ numeric_scols: List[Column] = [
+ internal.spark_column_for(label).cast("double") for label in
numeric_labels
+ ]
+ numeric_col_names: List[str] = [name_like_string(label) for label
in numeric_labels]
+ num_scols = len(numeric_scols)
+
+ sdf = internal.spark_frame
+ tmp_index_1_col = verify_temp_column_name(sdf,
"__tmp_index_1_col__")
+ tmp_index_2_col = verify_temp_column_name(sdf,
"__tmp_index_2_col__")
+ tmp_value_1_col = verify_temp_column_name(sdf,
"__tmp_value_1_col__")
+ tmp_value_2_col = verify_temp_column_name(sdf,
"__tmp_value_2_col__")
+
+ # simple dataset
+ # +---+---+----+
+ # | A| B| C|
+ # +---+---+----+
+ # | 1| 2| 3.0|
+ # | 4| 1|null|
+ # +---+---+----+
+
+ pair_scols: List[Column] = []
+ for i in range(0, num_scols):
Review Comment:
nit: we can omit the `0` since it's default ?
```diff
- for i in range(0, num_scols):
+ for i in range(num_scols):
```
Either looks okay, though.
##########
python/pyspark/pandas/frame.py:
##########
@@ -1454,11 +1462,196 @@ def corr(self, method: str = "pearson") -> "DataFrame":
There are behavior differences between pandas-on-Spark and pandas.
* the `method` argument only accepts 'pearson', 'spearman'
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
- * pandas-on-Spark doesn't support the following argument(s).
+ * if the `method` is `spearman`, the data should not contain NaNs.
+ * if the `method` is `spearman`, `min_periods` argument is not
supported.
+ """
+ if method not in ["pearson", "spearman", "kendall"]:
+ raise ValueError(f"Invalid method {method}")
+ if method == "kendall":
+ raise NotImplementedError("method doesn't support kendall for now")
+ if min_periods is not None and not isinstance(min_periods, int):
+ raise TypeError(f"Invalid min_periods type
{type(min_periods).__name__}")
+ if min_periods is not None and method == "spearman":
+ raise NotImplementedError("min_periods doesn't support spearman
for now")
+
+ if method == "pearson":
+ min_periods = 1 if min_periods is None else min_periods
+ internal = self._internal.resolved_copy
+ numeric_labels = [
+ label
+ for label in internal.column_labels
+ if isinstance(internal.spark_type_for(label), (NumericType,
BooleanType))
+ ]
+ numeric_scols: List[Column] = [
+ internal.spark_column_for(label).cast("double") for label in
numeric_labels
+ ]
+ numeric_col_names: List[str] = [name_like_string(label) for label
in numeric_labels]
+ num_scols = len(numeric_scols)
+
+ sdf = internal.spark_frame
+ tmp_index_1_col = verify_temp_column_name(sdf,
"__tmp_index_1_col__")
+ tmp_index_2_col = verify_temp_column_name(sdf,
"__tmp_index_2_col__")
+ tmp_value_1_col = verify_temp_column_name(sdf,
"__tmp_value_1_col__")
+ tmp_value_2_col = verify_temp_column_name(sdf,
"__tmp_value_2_col__")
Review Comment:
nit: how about `tmp_index_x_col_name` instead of `tmp_index_x_col` to
explicitly indicate it's the name of column rather than column itself ?
##########
python/pyspark/pandas/frame.py:
##########
@@ -1454,11 +1462,196 @@ def corr(self, method: str = "pearson") -> "DataFrame":
There are behavior differences between pandas-on-Spark and pandas.
* the `method` argument only accepts 'pearson', 'spearman'
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
- * pandas-on-Spark doesn't support the following argument(s).
+ * if the `method` is `spearman`, the data should not contain NaNs.
+ * if the `method` is `spearman`, `min_periods` argument is not
supported.
+ """
+ if method not in ["pearson", "spearman", "kendall"]:
+ raise ValueError(f"Invalid method {method}")
+ if method == "kendall":
+ raise NotImplementedError("method doesn't support kendall for now")
+ if min_periods is not None and not isinstance(min_periods, int):
+ raise TypeError(f"Invalid min_periods type
{type(min_periods).__name__}")
+ if min_periods is not None and method == "spearman":
+ raise NotImplementedError("min_periods doesn't support spearman
for now")
+
+ if method == "pearson":
+ min_periods = 1 if min_periods is None else min_periods
+ internal = self._internal.resolved_copy
+ numeric_labels = [
+ label
+ for label in internal.column_labels
+ if isinstance(internal.spark_type_for(label), (NumericType,
BooleanType))
+ ]
+ numeric_scols: List[Column] = [
+ internal.spark_column_for(label).cast("double") for label in
numeric_labels
+ ]
+ numeric_col_names: List[str] = [name_like_string(label) for label
in numeric_labels]
+ num_scols = len(numeric_scols)
+
+ sdf = internal.spark_frame
+ tmp_index_1_col = verify_temp_column_name(sdf,
"__tmp_index_1_col__")
+ tmp_index_2_col = verify_temp_column_name(sdf,
"__tmp_index_2_col__")
+ tmp_value_1_col = verify_temp_column_name(sdf,
"__tmp_value_1_col__")
+ tmp_value_2_col = verify_temp_column_name(sdf,
"__tmp_value_2_col__")
+
+ # simple dataset
+ # +---+---+----+
+ # | A| B| C|
+ # +---+---+----+
+ # | 1| 2| 3.0|
+ # | 4| 1|null|
+ # +---+---+----+
+
+ pair_scols: List[Column] = []
+ for i in range(0, num_scols):
+ for j in range(i, num_scols):
+ pair_scols.append(
+ F.struct(
+ F.lit(i).alias(tmp_index_1_col),
+ F.lit(j).alias(tmp_index_2_col),
+ numeric_scols[i].alias(tmp_value_1_col),
+ numeric_scols[j].alias(tmp_value_2_col),
+ )
+ )
+
+ #
+-------------------+-------------------+-------------------+-------------------+
+ #
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__|
+ #
+-------------------+-------------------+-------------------+-------------------+
+ # | 0| 0| 1.0|
1.0|
+ # | 0| 1| 1.0|
2.0|
+ # | 0| 2| 1.0|
3.0|
+ # | 1| 1| 2.0|
2.0|
+ # | 1| 2| 2.0|
3.0|
+ # | 2| 2| 3.0|
3.0|
+ # | 0| 0| 4.0|
4.0|
+ # | 0| 1| 4.0|
1.0|
+ # | 0| 2| 4.0|
null|
+ # | 1| 1| 1.0|
1.0|
+ # | 1| 2| 1.0|
null|
+ # | 2| 2| null|
null|
+ #
+-------------------+-------------------+-------------------+-------------------+
+ tmp_tuple_col = verify_temp_column_name(sdf, "__tmp_tuple_col__")
+ sdf =
sdf.select(F.explode(F.array(*pair_scols)).alias(tmp_tuple_col)).select(
+
F.col(f"{tmp_tuple_col}.{tmp_index_1_col}").alias(tmp_index_1_col),
+
F.col(f"{tmp_tuple_col}.{tmp_index_2_col}").alias(tmp_index_2_col),
+
F.col(f"{tmp_tuple_col}.{tmp_value_1_col}").alias(tmp_value_1_col),
+
F.col(f"{tmp_tuple_col}.{tmp_value_2_col}").alias(tmp_value_2_col),
+ )
+
+ #
+-------------------+-------------------+------------------------+-----------------+
+ #
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_pearson_corr_col__|__tmp_count_col__|
+ #
+-------------------+-------------------+------------------------+-----------------+
+ # | 2| 2|
null| 1|
+ # | 1| 2|
null| 1|
+ # | 1| 1|
1.0| 2|
+ # | 0| 0|
1.0| 2|
+ # | 0| 1|
-1.0| 2|
+ # | 0| 2|
null| 1|
+ #
+-------------------+-------------------+------------------------+-----------------+
+ tmp_corr_col = verify_temp_column_name(sdf,
"__tmp_pearson_corr_col__")
+ tmp_count_col = verify_temp_column_name(sdf, "__tmp_count_col__")
+ sdf = sdf.groupby(tmp_index_1_col, tmp_index_2_col).agg(
+ F.corr(tmp_value_1_col, tmp_value_2_col).alias(tmp_corr_col),
+ F.count(
+ F.when(
+ F.col(tmp_value_1_col).isNotNull() &
F.col(tmp_value_2_col).isNotNull(), 1
+ )
+ ).alias(tmp_count_col),
+ )
+
+ #
+-------------------+-------------------+------------------------+
+ #
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_pearson_corr_col__|
+ #
+-------------------+-------------------+------------------------+
+ # | 2| 2|
null|
+ # | 1| 2|
null|
+ # | 2| 1|
null|
+ # | 1| 1|
1.0|
+ # | 0| 0|
1.0|
+ # | 0| 1|
-1.0|
+ # | 1| 0|
-1.0|
+ # | 0| 2|
null|
+ # | 2| 0|
null|
+ #
+-------------------+-------------------+------------------------+
+ sdf = (
+ sdf.withColumn(
+ tmp_corr_col,
+ F.when(F.col(tmp_count_col) >= min_periods,
F.col(tmp_corr_col)).otherwise(
+ F.lit(None)
+ ),
+ )
+ .withColumn(
+ tmp_tuple_col,
+ F.explode(
+ F.when(
+ F.col(tmp_index_1_col) == F.col(tmp_index_2_col),
+ F.lit([0]),
+ ).otherwise(F.lit([0, 1]))
+ ),
+ )
+ .select(
+ F.when(F.col(tmp_tuple_col) == 0, F.col(tmp_index_1_col))
+ .otherwise(F.col(tmp_index_2_col))
+ .alias(tmp_index_1_col),
+ F.when(F.col(tmp_tuple_col) == 0, F.col(tmp_index_2_col))
+ .otherwise(F.col(tmp_index_1_col))
+ .alias(tmp_index_2_col),
+ F.col(tmp_corr_col),
+ )
+ )
+
+ # +-------------------+--------------------+
+ # |__tmp_index_1_col__| __tmp_array_col__|
+ # +-------------------+--------------------+
+ # | 0|[{0, 1.0}, {1, -1...|
+ # | 1|[{0, -1.0}, {1, 1...|
+ # | 2|[{0, null}, {1, n...|
+ # +-------------------+--------------------+
+ tmp_array_col = verify_temp_column_name(sdf, "__tmp_array_col__")
Review Comment:
ditto
##########
python/pyspark/pandas/frame.py:
##########
@@ -1454,11 +1462,196 @@ def corr(self, method: str = "pearson") -> "DataFrame":
There are behavior differences between pandas-on-Spark and pandas.
* the `method` argument only accepts 'pearson', 'spearman'
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
- * pandas-on-Spark doesn't support the following argument(s).
+ * if the `method` is `spearman`, the data should not contain NaNs.
+ * if the `method` is `spearman`, `min_periods` argument is not
supported.
+ """
+ if method not in ["pearson", "spearman", "kendall"]:
+ raise ValueError(f"Invalid method {method}")
+ if method == "kendall":
+ raise NotImplementedError("method doesn't support kendall for now")
+ if min_periods is not None and not isinstance(min_periods, int):
+ raise TypeError(f"Invalid min_periods type
{type(min_periods).__name__}")
+ if min_periods is not None and method == "spearman":
+ raise NotImplementedError("min_periods doesn't support spearman
for now")
+
+ if method == "pearson":
+ min_periods = 1 if min_periods is None else min_periods
+ internal = self._internal.resolved_copy
+ numeric_labels = [
+ label
+ for label in internal.column_labels
+ if isinstance(internal.spark_type_for(label), (NumericType,
BooleanType))
+ ]
+ numeric_scols: List[Column] = [
+ internal.spark_column_for(label).cast("double") for label in
numeric_labels
+ ]
+ numeric_col_names: List[str] = [name_like_string(label) for label
in numeric_labels]
+ num_scols = len(numeric_scols)
+
+ sdf = internal.spark_frame
+ tmp_index_1_col = verify_temp_column_name(sdf,
"__tmp_index_1_col__")
+ tmp_index_2_col = verify_temp_column_name(sdf,
"__tmp_index_2_col__")
+ tmp_value_1_col = verify_temp_column_name(sdf,
"__tmp_value_1_col__")
+ tmp_value_2_col = verify_temp_column_name(sdf,
"__tmp_value_2_col__")
+
+ # simple dataset
+ # +---+---+----+
+ # | A| B| C|
+ # +---+---+----+
+ # | 1| 2| 3.0|
+ # | 4| 1|null|
+ # +---+---+----+
+
+ pair_scols: List[Column] = []
+ for i in range(0, num_scols):
+ for j in range(i, num_scols):
+ pair_scols.append(
+ F.struct(
+ F.lit(i).alias(tmp_index_1_col),
+ F.lit(j).alias(tmp_index_2_col),
+ numeric_scols[i].alias(tmp_value_1_col),
+ numeric_scols[j].alias(tmp_value_2_col),
+ )
+ )
+
+ #
+-------------------+-------------------+-------------------+-------------------+
+ #
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__|
+ #
+-------------------+-------------------+-------------------+-------------------+
+ # | 0| 0| 1.0|
1.0|
+ # | 0| 1| 1.0|
2.0|
+ # | 0| 2| 1.0|
3.0|
+ # | 1| 1| 2.0|
2.0|
+ # | 1| 2| 2.0|
3.0|
+ # | 2| 2| 3.0|
3.0|
+ # | 0| 0| 4.0|
4.0|
+ # | 0| 1| 4.0|
1.0|
+ # | 0| 2| 4.0|
null|
+ # | 1| 1| 1.0|
1.0|
+ # | 1| 2| 1.0|
null|
+ # | 2| 2| null|
null|
+ #
+-------------------+-------------------+-------------------+-------------------+
+ tmp_tuple_col = verify_temp_column_name(sdf, "__tmp_tuple_col__")
+ sdf =
sdf.select(F.explode(F.array(*pair_scols)).alias(tmp_tuple_col)).select(
+
F.col(f"{tmp_tuple_col}.{tmp_index_1_col}").alias(tmp_index_1_col),
+
F.col(f"{tmp_tuple_col}.{tmp_index_2_col}").alias(tmp_index_2_col),
+
F.col(f"{tmp_tuple_col}.{tmp_value_1_col}").alias(tmp_value_1_col),
+
F.col(f"{tmp_tuple_col}.{tmp_value_2_col}").alias(tmp_value_2_col),
+ )
+
+ #
+-------------------+-------------------+------------------------+-----------------+
+ #
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_pearson_corr_col__|__tmp_count_col__|
+ #
+-------------------+-------------------+------------------------+-----------------+
+ # | 2| 2|
null| 1|
+ # | 1| 2|
null| 1|
+ # | 1| 1|
1.0| 2|
+ # | 0| 0|
1.0| 2|
+ # | 0| 1|
-1.0| 2|
+ # | 0| 2|
null| 1|
+ #
+-------------------+-------------------+------------------------+-----------------+
+ tmp_corr_col = verify_temp_column_name(sdf,
"__tmp_pearson_corr_col__")
+ tmp_count_col = verify_temp_column_name(sdf, "__tmp_count_col__")
Review Comment:
ditto
##########
python/pyspark/pandas/frame.py:
##########
@@ -1454,11 +1462,196 @@ def corr(self, method: str = "pearson") -> "DataFrame":
There are behavior differences between pandas-on-Spark and pandas.
* the `method` argument only accepts 'pearson', 'spearman'
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
- * pandas-on-Spark doesn't support the following argument(s).
+ * if the `method` is `spearman`, the data should not contain NaNs.
+ * if the `method` is `spearman`, `min_periods` argument is not
supported.
+ """
+ if method not in ["pearson", "spearman", "kendall"]:
+ raise ValueError(f"Invalid method {method}")
+ if method == "kendall":
+ raise NotImplementedError("method doesn't support kendall for now")
+ if min_periods is not None and not isinstance(min_periods, int):
+ raise TypeError(f"Invalid min_periods type
{type(min_periods).__name__}")
+ if min_periods is not None and method == "spearman":
+ raise NotImplementedError("min_periods doesn't support spearman
for now")
+
+ if method == "pearson":
+ min_periods = 1 if min_periods is None else min_periods
+ internal = self._internal.resolved_copy
+ numeric_labels = [
+ label
+ for label in internal.column_labels
+ if isinstance(internal.spark_type_for(label), (NumericType,
BooleanType))
+ ]
+ numeric_scols: List[Column] = [
+ internal.spark_column_for(label).cast("double") for label in
numeric_labels
+ ]
+ numeric_col_names: List[str] = [name_like_string(label) for label
in numeric_labels]
+ num_scols = len(numeric_scols)
+
+ sdf = internal.spark_frame
+ tmp_index_1_col = verify_temp_column_name(sdf,
"__tmp_index_1_col__")
+ tmp_index_2_col = verify_temp_column_name(sdf,
"__tmp_index_2_col__")
+ tmp_value_1_col = verify_temp_column_name(sdf,
"__tmp_value_1_col__")
+ tmp_value_2_col = verify_temp_column_name(sdf,
"__tmp_value_2_col__")
+
+ # simple dataset
+ # +---+---+----+
+ # | A| B| C|
+ # +---+---+----+
+ # | 1| 2| 3.0|
+ # | 4| 1|null|
+ # +---+---+----+
+
+ pair_scols: List[Column] = []
+ for i in range(0, num_scols):
+ for j in range(i, num_scols):
+ pair_scols.append(
+ F.struct(
+ F.lit(i).alias(tmp_index_1_col),
+ F.lit(j).alias(tmp_index_2_col),
+ numeric_scols[i].alias(tmp_value_1_col),
+ numeric_scols[j].alias(tmp_value_2_col),
+ )
+ )
+
+ #
+-------------------+-------------------+-------------------+-------------------+
+ #
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__|
+ #
+-------------------+-------------------+-------------------+-------------------+
+ # | 0| 0| 1.0|
1.0|
+ # | 0| 1| 1.0|
2.0|
+ # | 0| 2| 1.0|
3.0|
+ # | 1| 1| 2.0|
2.0|
+ # | 1| 2| 2.0|
3.0|
+ # | 2| 2| 3.0|
3.0|
+ # | 0| 0| 4.0|
4.0|
+ # | 0| 1| 4.0|
1.0|
+ # | 0| 2| 4.0|
null|
+ # | 1| 1| 1.0|
1.0|
+ # | 1| 2| 1.0|
null|
+ # | 2| 2| null|
null|
+ #
+-------------------+-------------------+-------------------+-------------------+
+ tmp_tuple_col = verify_temp_column_name(sdf, "__tmp_tuple_col__")
Review Comment:
ditto ? I think `tmp_tuple_col_name` is a bit more explicit.
##########
python/pyspark/pandas/frame.py:
##########
@@ -1454,11 +1462,196 @@ def corr(self, method: str = "pearson") -> "DataFrame":
There are behavior differences between pandas-on-Spark and pandas.
* the `method` argument only accepts 'pearson', 'spearman'
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
- * pandas-on-Spark doesn't support the following argument(s).
+ * if the `method` is `spearman`, the data should not contain NaNs.
+ * if the `method` is `spearman`, `min_periods` argument is not
supported.
+ """
+ if method not in ["pearson", "spearman", "kendall"]:
+ raise ValueError(f"Invalid method {method}")
+ if method == "kendall":
+ raise NotImplementedError("method doesn't support kendall for now")
+ if min_periods is not None and not isinstance(min_periods, int):
+ raise TypeError(f"Invalid min_periods type
{type(min_periods).__name__}")
Review Comment:
Seems like pandas allows float:
```python
>>> pdf.corr('pearson', min_periods=1.4)
dogs cats
dogs 1.000000 -0.851064
cats -0.851064 1.000000
```
But I'm not sure if it's intended behavior or not, since they raises
`TypeError: an integer is required` when the type is `str` as below:
```python
>>> pdf.corr('pearson', min_periods='a')
Traceback (most recent call last):
...
TypeError: an integer is required
>>>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]