zhengruifeng commented on code in PR #36063:
URL: https://github.com/apache/spark/pull/36063#discussion_r845759363


##########
python/pyspark/pandas/window.py:
##########
@@ -1749,6 +1752,177 @@ def var(self) -> FrameLike:
         return super().var()
 
 
+class ExponentialMovingLike(Generic[FrameLike], metaclass=ABCMeta):
+    def __init__(
+        self,
+        window: WindowSpec,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ):
+        if (min_periods is not None) and (min_periods < 0):
+            raise ValueError("min_periods must be >= 0")
+        if min_periods is None:
+            min_periods = 0
+
+        self._window = window
+        # This unbounded Window is later used to handle 'min_periods' for now.
+        self._unbounded_window = 
Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(
+            Window.unboundedPreceding, Window.currentRow
+        )
+        self._min_periods = min_periods
+
+        opt_count = 0
+
+        if com is not None:
+            if com < 0:
+                raise ValueError("com must be >= 0")
+            self._alpha = 1.0 / (1 + com)
+            opt_count += 1
+
+        if span is not None:
+            if span < 1:
+                raise ValueError("span must be >= 1")
+            self._alpha = 2.0 / (1 + span)
+            opt_count += 1
+
+        if halflife is not None:
+            if halflife <= 0:
+                raise ValueError("halflife must be > 0")
+            self._alpha = 1.0 - np.exp(-np.log(2) / halflife)
+            opt_count += 1
+
+        if alpha is not None:
+            if alpha <= 0 or alpha > 1:
+                raise ValueError("alpha must be in (0, 1]")
+            self._alpha = alpha
+            opt_count += 1
+
+        if opt_count == 0:
+            raise ValueError("Must pass one of comass, span, halflife, or 
alpha")
+
+        if opt_count != 1:
+            raise ValueError("comass, span, halflife, and alpha are mutually 
exclusive")
+
+    @abstractmethod
+    def _apply_as_series_or_frame(self, func: Callable[[Column], Column]) -> 
FrameLike:
+        """
+        Wraps a function that handles Spark column in order
+        to support it in both pandas-on-Spark Series and DataFrame.
+        Note that the given `func` name should be same as the API's method 
name.
+        """
+        pass
+
+    def mean(self) -> FrameLike:
+        def mean(scol: Column) -> Column:
+            jf = SparkContext._active_spark_context._jvm.PythonSQLUtils.ewm
+            return F.when(
+                F.row_number().over(self._unbounded_window) >= 
self._min_periods,
+                Column(jf(scol._jc, self._alpha)).over(self._window),
+            ).otherwise(SF.lit(None))
+
+        return self._apply_as_series_or_frame(mean)
+
+
+class ExponentialMoving(ExponentialMovingLike[FrameLike]):
+    def __init__(
+        self,
+        psdf_or_psser: FrameLike,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ):
+        from pyspark.pandas.frame import DataFrame
+        from pyspark.pandas.series import Series
+
+        if not isinstance(psdf_or_psser, (DataFrame, Series)):
+            raise TypeError(
+                "psdf_or_psser must be a series or dataframe; however, got: %s"
+                % type(psdf_or_psser)
+            )
+        self._psdf_or_psser = psdf_or_psser
+
+        window_spec = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(
+            Window.unboundedPreceding, Window.currentRow
+        )
+
+        super().__init__(window_spec, com, span, halflife, alpha, min_periods)
+
+    def __getattr__(self, item: str) -> Any:
+        if hasattr(MissingPandasLikeExponentialMoving, item):
+            property_or_func = getattr(MissingPandasLikeExponentialMoving, 
item)
+            if isinstance(property_or_func, property):
+                return property_or_func.fget(self)
+            else:
+                return partial(property_or_func, self)
+        raise AttributeError(item)
+
+    _apply_as_series_or_frame = Rolling._apply_as_series_or_frame
+
+    def mean(self) -> FrameLike:
+        """
+        Calculate an online exponentially weighted mean.
+
+        Notes
+        -----
+        There are behavior differences between pandas-on-Spark and pandas.
+
+        * the data should not contain NaNs. pandas-on-Spark will return an 
error.
+        * the current implementation of this API uses Spark's Window without
+          specifying partition specification. This leads to move all data into
+          single partition in single machine and could cause serious
+          performance degradation. Avoid this method against very large 
dataset.
+
+        Returns
+        -------
+        Series or DataFrame
+            Returned object type is determined by the caller of the 
exponentially
+            calculation.
+
+        See Also
+        --------
+        Series.expanding : Calling object with Series data.
+        DataFrame.expanding : Calling object with DataFrames.
+        Series.mean : Equivalent method for Series.
+        DataFrame.mean : Equivalent method for DataFrame.
+
+        Examples
+        --------
+        The below examples will show expanding mean calculations with window 
sizes of
+        two and three, respectively.

Review Comment:
   yes, good catch.  Will update those doc soon. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to