Yikun commented on a change in pull request #34213:
URL: https://github.com/apache/spark/pull/34213#discussion_r733270913
##########
File path: python/pyspark/pandas/frame.py
##########
@@ -8201,6 +8202,185 @@ def update(self, other: "DataFrame", join: str =
"left", overwrite: bool = True)
internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
self._update_internal_frame(internal, requires_same_anchor=False)
+ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
Review comment:
There are a `ddof` arg in [pandas#def
cov](https://github.com/pandas-dev/pandas/blob/master/pandas/core/frame.py#L9581-L9585),
so better add a TODO in here.
BTW, for min_periods, I guess it's just to keep interface consist with
pandas, but I think:
1. it could change to `def cov(self, min_periods: Optional[int] = 1) ->
"DataFrame":`, and remove L8279
2. add a note on the min_periods(like `min_periods would be set to 1 if
min_periods is None.`) to explain what we extactly done.
##########
File path: python/pyspark/pandas/tests/test_dataframe.py
##########
@@ -6025,6 +6025,64 @@ def test_multi_index_dtypes(self):
)
self.assert_eq(psmidx.dtypes, expected)
+ def test_cov(self):
+ # SPARK-36396: Implement DataFrame.cov
+
+ # int
+ pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["a",
"b"])
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5),
almost=True)
+
+ # bool
+ pdf = pd.DataFrame(
+ {
+ "a": [1, np.nan, 3, 4],
+ "b": [True, False, False, True],
+ "c": [True, True, False, True],
+ }
+ )
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5),
almost=True)
Review comment:
ditto
##########
File path: python/pyspark/pandas/tests/test_dataframe.py
##########
@@ -6025,6 +6025,64 @@ def test_multi_index_dtypes(self):
)
self.assert_eq(psmidx.dtypes, expected)
+ def test_cov(self):
+ # SPARK-36396: Implement DataFrame.cov
+
+ # int
+ pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["a",
"b"])
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5),
almost=True)
Review comment:
nit: I guess you could remove the `almost=True` in here(min_periods=5).
the results are NaN, so better to do a complete match test.
##########
File path: python/pyspark/pandas/frame.py
##########
@@ -8201,6 +8202,185 @@ def update(self, other: "DataFrame", join: str =
"left", overwrite: bool = True)
internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
self._update_internal_frame(internal, requires_same_anchor=False)
+ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+ """
+ Compute pairwise covariance of columns, excluding NA/null values.
+
+ Compute the pairwise covariance among the series of a DataFrame.
+ The returned data frame is the `covariance matrix
+ <https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns
+ of the DataFrame.
+
+ Both NA and null values are automatically excluded from the
+ calculation. (See the note below about bias from missing values.)
+ A threshold can be set for the minimum number of
+ observations for each value created. Comparisons with observations
+ below this threshold will be returned as ``NaN``.
+
+ This method is generally used for the analysis of time series data to
+ understand the relationship between different measures
+ across time.
+
+ .. versionadded:: 3.3.0
+
+ Parameters
+ ----------
+ min_periods : int, optional
+ Minimum number of observations required per pair of columns
+ to have a valid result.
+
+ Returns
+ -------
+ DataFrame
+ The covariance matrix of the series of the DataFrame.
+
+ See Also
+ --------
+ Series.cov : Compute covariance with another Series.
+
+ Examples
+ --------
+ >>> df = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
+ ... columns=['dogs', 'cats'])
+ >>> df.cov()
+ dogs cats
+ dogs 0.666667 -1.000000
+ cats -1.000000 1.666667
+
+ >>> np.random.seed(42)
+ >>> df = ps.DataFrame(np.random.randn(1000, 5),
+ ... columns=['a', 'b', 'c', 'd', 'e'])
+ >>> df.cov()
+ a b c d e
+ a 0.998438 -0.020161 0.059277 -0.008943 0.014144
+ b -0.020161 1.059352 -0.008543 -0.024738 0.009826
+ c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
+ d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
+ e 0.014144 0.009826 -0.000271 -0.013692 0.977795
+
+ **Minimum number of periods**
+
+ This method also supports an optional ``min_periods`` keyword
+ that specifies the required minimum number of non-NA observations for
+ each column pair in order to have a valid result:
+
+ >>> np.random.seed(42)
+ >>> df = pd.DataFrame(np.random.randn(20, 3),
+ ... columns=['a', 'b', 'c'])
+ >>> df.loc[df.index[:5], 'a'] = np.nan
+ >>> df.loc[df.index[5:10], 'b'] = np.nan
+ >>> sdf = ps.from_pandas(df)
+ >>> sdf.cov(min_periods=12)
+ a b c
+ a 0.316741 NaN -0.150812
+ b NaN 1.248003 0.191417
+ c -0.150812 0.191417 0.895202
+ """
+ min_periods = 1 if min_periods is None else min_periods
+
+ # Only compute covariance for Boolean and Numeric except Decimal
+ psdf = self[
+ [
+ col
+ for col in self.columns
+ if isinstance(self[col].spark.data_type, BooleanType)
+ or (
+ isinstance(self[col].spark.data_type, NumericType)
+ and not isinstance(self[col].spark.data_type, DecimalType)
+ )
+ ]
+ ]
+
+ num_cols = len(psdf.columns)
+ data_cols = psdf._internal.data_spark_column_names
+ cov_scols = []
+ count_not_null_scols = []
+
+ # Count number of null row between two columns
+ # Example:
+ # a b c
+ # 0 1 1 1
+ # 1 NaN 2 2
+ # 2 3 NaN 3
+ # 3 4 4 4
+ #
+ # a b c
+ # a count(a, a) count(a, b) count(a, c)
+ # b count(b, b) count(b, c)
+ # c count(c, c)
+ #
+ # count_not_null_scols =
+ # [F.count(a, a), F.count(a, b), F.count(a, c), F.count(b, b),
F.count(b, c), F.count(c, c)]
+ for r in range(0, num_cols):
+ for c in range(r, num_cols):
+ count_not_null_scols.append(
+ F.count(
+ F.when(F.col(data_cols[r]).isNotNull() &
F.col(data_cols[c]).isNotNull(), 1)
+ )
+ )
+
+ count_not_null = (
+ psdf._internal.spark_frame.replace(float("nan"), None)
+ .select(*count_not_null_scols)
+ .head(1)[0]
+ )
+
+ # Calculate covariance between two columns
+ # Example:
+ # with min_periods = 3
+ # a b c
+ # 0 1 1 1
+ # 1 NaN 2 2
+ # 2 3 NaN 3
+ # 3 4 4 4
+ #
+ # a b c
+ # a cov(a, a) None cov(a, c)
+ # b cov(b, b) cov(b, c)
+ # c cov(c, c)
+ #
+ # cov_scols = [F.cov(a, a), None, F.cov(a, c), F.cov(b, b), F.cov(b,
c), F.cov(c, c)]
+ step = 0
+ for r in range(0, num_cols):
+ step += r
+ for c in range(r, num_cols):
+ cov_scols.append(
+ F.covar_samp(
+ F.col(data_cols[r]).cast("double"),
F.col(data_cols[c]).cast("double")
+ )
+ if count_not_null[r * num_cols + c - step] >= min_periods
+ else F.lit(None)
+ )
+
+ pair_cov = psdf._internal.spark_frame.select(*cov_scols).head(1)[0]
+
+ # Convert from row to 2D array
+ # Example:
+ # pair_cov = [cov(a, a), None, cov(a, c), cov(b, b), cov(b, c), cov(c,
c)]
+ #
+ # cov =
+ #
+ # a b c
+ # a cov(a, a) None cov(a, c)
+ # b cov(b, b) cov(b, c)
+ # c cov(c, c)
+ cov = np.zeros([num_cols, num_cols])
Review comment:
note for myself: looks like it's a reshape operation, we could extract
into utils or somewhere in future.
##########
File path: python/pyspark/pandas/frame.py
##########
@@ -8201,6 +8202,185 @@ def update(self, other: "DataFrame", join: str =
"left", overwrite: bool = True)
internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
self._update_internal_frame(internal, requires_same_anchor=False)
+ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+ """
+ Compute pairwise covariance of columns, excluding NA/null values.
+
+ Compute the pairwise covariance among the series of a DataFrame.
+ The returned data frame is the `covariance matrix
+ <https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns
+ of the DataFrame.
+
+ Both NA and null values are automatically excluded from the
+ calculation. (See the note below about bias from missing values.)
+ A threshold can be set for the minimum number of
+ observations for each value created. Comparisons with observations
+ below this threshold will be returned as ``NaN``.
+
+ This method is generally used for the analysis of time series data to
+ understand the relationship between different measures
+ across time.
+
+ .. versionadded:: 3.3.0
+
+ Parameters
+ ----------
+ min_periods : int, optional
+ Minimum number of observations required per pair of columns
+ to have a valid result.
+
+ Returns
+ -------
+ DataFrame
+ The covariance matrix of the series of the DataFrame.
+
+ See Also
+ --------
+ Series.cov : Compute covariance with another Series.
+
+ Examples
+ --------
+ >>> df = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
+ ... columns=['dogs', 'cats'])
+ >>> df.cov()
+ dogs cats
+ dogs 0.666667 -1.000000
+ cats -1.000000 1.666667
+
+ >>> np.random.seed(42)
+ >>> df = ps.DataFrame(np.random.randn(1000, 5),
+ ... columns=['a', 'b', 'c', 'd', 'e'])
+ >>> df.cov()
+ a b c d e
+ a 0.998438 -0.020161 0.059277 -0.008943 0.014144
+ b -0.020161 1.059352 -0.008543 -0.024738 0.009826
+ c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
+ d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
+ e 0.014144 0.009826 -0.000271 -0.013692 0.977795
+
+ **Minimum number of periods**
+
+ This method also supports an optional ``min_periods`` keyword
+ that specifies the required minimum number of non-NA observations for
+ each column pair in order to have a valid result:
+
+ >>> np.random.seed(42)
+ >>> df = pd.DataFrame(np.random.randn(20, 3),
+ ... columns=['a', 'b', 'c'])
+ >>> df.loc[df.index[:5], 'a'] = np.nan
+ >>> df.loc[df.index[5:10], 'b'] = np.nan
+ >>> sdf = ps.from_pandas(df)
+ >>> sdf.cov(min_periods=12)
+ a b c
+ a 0.316741 NaN -0.150812
+ b NaN 1.248003 0.191417
+ c -0.150812 0.191417 0.895202
+ """
+ min_periods = 1 if min_periods is None else min_periods
+
+ # Only compute covariance for Boolean and Numeric except Decimal
+ psdf = self[
+ [
+ col
+ for col in self.columns
+ if isinstance(self[col].spark.data_type, BooleanType)
+ or (
+ isinstance(self[col].spark.data_type, NumericType)
+ and not isinstance(self[col].spark.data_type, DecimalType)
+ )
+ ]
+ ]
+
+ num_cols = len(psdf.columns)
Review comment:
nit: Looks like we could add a quick return, to speedup the case when
min_periods `min_periods > num_cols`, fill nan in df and return directly, then
we no need to setup the spark job to improve the performance, like [pandas
done](https://github.com/pandas-dev/pandas/blob/f3f90c33966e3ba334c459ad89d607f820caa1f8/pandas/core/frame.py#L9663)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]