itholic commented on a change in pull request #33752:
URL: https://github.com/apache/spark/pull/33752#discussion_r691700514



##########
File path: python/pyspark/pandas/series.py
##########
@@ -944,6 +944,57 @@ def between(self, left: Any, right: Any, inclusive: bool = 
True) -> "Series":
 
         return lmask & rmask
 
+    def cov(self, other: "Series", min_periods: int = 1) -> float:
+        """
+        Compute covariance with Series, excluding missing values.
+        Parameters
+        ----------
+        other : Series
+            Series with which to compute the covariance.
+        min_periods : int, default 1
+            Minimum number of observations needed to have a valid result. None 
= 1.
+
+        Returns
+        -------
+        float
+            Covariance between Series and other
+
+        Examples
+        --------
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series([0.90010907, 0.13484424, 0.62036035])
+        >>> s2 = ps.Series([0.12528585, 0.26962463, 0.51111198])
+        >>> s1.cov(s2)
+        -0.016857626527158744
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+
+        if min_periods is None:
+            min_periods = 1
+
+        if same_anchor(self, other):
+            self_column_label = verify_temp_column_name(other.to_frame(), 
"__self_column__")
+            other_column_label = verify_temp_column_name(self.to_frame(), 
"__other_column__")
+            combined = DataFrame(
+                self._internal.with_new_columns(
+                    [self.rename(self_column_label), 
other.rename(other_column_label)]
+                )
+            )

Review comment:
       How about something like:
   
   ```python
           min_periods = 1 if min_periods is None else min_periods
   
           if same_anchor(self, other):
               sdf = self._internal.spark_frame.select(self.spark.column, 
other.spark.column)
           else:
               combined = combine_frames(self.to_frame(), other.to_frame())
               sdf = 
combined._internal.spark_frame.select(*combined._internal.data_spark_columns)
   
           sdf = sdf.dropna()
   
           if len(sdf.head(min_periods)) < min_periods:
               return np.nan
           else:
               return sdf.select(F.covar_samp(*sdf.columns)).head(1)[0][0]
   ```
   
   We'd better to leverage PySpark API (DataFrame, Column,,,) as much as 
possible.
   
   And we should avoid compute entire length of pandas-on-Spark object as much 
as possible, too.
   
   In this case, I think we can use `head` with `min_periods` instead of `len`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to