itholic commented on code in PR #36063:
URL: https://github.com/apache/spark/pull/36063#discussion_r845680041


##########
python/pyspark/pandas/tests/test_ewm.py:
##########
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import numpy as np
+import pandas as pd
+
+import pyspark.pandas as ps
+from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
+from pyspark.pandas.window import ExponentialMoving
+
+
+class EWMTest(PandasOnSparkTestCase, TestUtils):
+    def test_ewm_error(self):
+        with self.assertRaisesRegex(
+            TypeError, "psdf_or_psser must be a series or dataframe; however, 
got:.*int"
+        ):
+            ExponentialMoving(1, 2)
+
+        with self.assertRaisesRegex(ValueError, "min_periods must be >= 0"):
+            ps.range(10).ewm(min_periods=-1, alpha=0.5)

Review Comment:
   Can we create one DataFrame and reuse it here and there instead of creating 
same DataFrame for every test?
   
   e.g.
   
   ```python
   ...
           psdf = ps.range(10)
   
           with self.assertRaisesRegex(ValueError, "min_periods must be >= 0"):
               psdf.ewm(min_periods=-1, alpha=0.5)
   
           with self.assertRaisesRegex(ValueError, "com must be >= 0"):
               psdf.ewm(com=-0.1)
   
           with self.assertRaisesRegex(ValueError, "span must be >= 1"):
               psdf.ewm(span=0.7)
   ...
   ```



##########
python/pyspark/pandas/generic.py:
##########
@@ -2619,6 +2619,56 @@ def expanding(self: FrameLike, min_periods: int = 1) -> 
"Expanding[FrameLike]":
 
         return Expanding(self, min_periods=min_periods)
 
+    # TODO: 'adjust', 'ignore_na', 'axis', 'method' parameter should be 
implemented.
+    def ewm(
+        self: FrameLike,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ) -> "ExponentialMoving[FrameLike]":
+        """
+        Provide exponentially weighted window transformations.
+
+        .. note:: 'min_periods' in pandas-on-Spark works as a fixed window 
size unlike pandas.

Review Comment:
   nit: It's not a big deal, just FYI, we usually use pandas-on-Spark when it's 
used before a noun.
   
   e.g. `pandas-on-Spark DataFrame`, `pandas-on-Spark docs` ...
   
   otherwise use full-name pandas API on Spark.



##########
python/pyspark/pandas/window.py:
##########
@@ -1749,6 +1752,177 @@ def var(self) -> FrameLike:
         return super().var()
 
 
+class ExponentialMovingLike(Generic[FrameLike], metaclass=ABCMeta):
+    def __init__(
+        self,
+        window: WindowSpec,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ):
+        if (min_periods is not None) and (min_periods < 0):
+            raise ValueError("min_periods must be >= 0")
+        if min_periods is None:
+            min_periods = 0
+
+        self._window = window
+        # This unbounded Window is later used to handle 'min_periods' for now.
+        self._unbounded_window = 
Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(
+            Window.unboundedPreceding, Window.currentRow
+        )
+        self._min_periods = min_periods
+
+        opt_count = 0
+
+        if com is not None:
+            if com < 0:
+                raise ValueError("com must be >= 0")
+            self._alpha = 1.0 / (1 + com)
+            opt_count += 1
+
+        if span is not None:
+            if span < 1:
+                raise ValueError("span must be >= 1")
+            self._alpha = 2.0 / (1 + span)
+            opt_count += 1
+
+        if halflife is not None:
+            if halflife <= 0:
+                raise ValueError("halflife must be > 0")
+            self._alpha = 1.0 - np.exp(-np.log(2) / halflife)
+            opt_count += 1
+
+        if alpha is not None:
+            if alpha <= 0 or alpha > 1:

Review Comment:
   nit: Maybe `if 0 < alpha <= 1:` for better readability ? Not strong felling, 
though



##########
python/pyspark/pandas/generic.py:
##########
@@ -2619,6 +2619,56 @@ def expanding(self: FrameLike, min_periods: int = 1) -> 
"Expanding[FrameLike]":
 
         return Expanding(self, min_periods=min_periods)
 
+    # TODO: 'adjust', 'ignore_na', 'axis', 'method' parameter should be 
implemented.
+    def ewm(
+        self: FrameLike,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ) -> "ExponentialMoving[FrameLike]":
+        """
+        Provide exponentially weighted window transformations.
+
+        .. note:: 'min_periods' in pandas-on-Spark works as a fixed window 
size unlike pandas.
+            Unlike pandas, NA is also counted as the period. This might be 
changed
+            in the near future.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        com : float, optional
+            Specify decay in terms of center of mass.
+            alpha = 1 / (1 + com), for com >= 0.
+
+        span : float, optional
+            Specify decay in terms of span.
+            alpha = 2 / (span + 1), for span >= 1.
+
+        halflife : float, optional
+            Specify decay in terms of half-life.
+            alpha = 1 - exp(-ln(2) / halflife), for halflife > 0.
+
+        alpha : float, optional
+            Specify smoothing factor alpha directly.
+            0 < alpha <= 1.
+
+        min_periods : int, default 0
+            Minimum number of observations in window required to have a value
+            (otherwise result is NA).

Review Comment:
   Does it mean that we return `pd.NA` instead of `np.nan` ? (I think pandas 
returns `np.nan`)



##########
python/pyspark/pandas/window.py:
##########
@@ -1749,6 +1752,177 @@ def var(self) -> FrameLike:
         return super().var()
 
 
+class ExponentialMovingLike(Generic[FrameLike], metaclass=ABCMeta):
+    def __init__(
+        self,
+        window: WindowSpec,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ):
+        if (min_periods is not None) and (min_periods < 0):
+            raise ValueError("min_periods must be >= 0")
+        if min_periods is None:
+            min_periods = 0
+
+        self._window = window
+        # This unbounded Window is later used to handle 'min_periods' for now.
+        self._unbounded_window = 
Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(
+            Window.unboundedPreceding, Window.currentRow
+        )
+        self._min_periods = min_periods
+
+        opt_count = 0
+
+        if com is not None:
+            if com < 0:
+                raise ValueError("com must be >= 0")
+            self._alpha = 1.0 / (1 + com)
+            opt_count += 1
+
+        if span is not None:
+            if span < 1:
+                raise ValueError("span must be >= 1")
+            self._alpha = 2.0 / (1 + span)
+            opt_count += 1
+
+        if halflife is not None:
+            if halflife <= 0:
+                raise ValueError("halflife must be > 0")
+            self._alpha = 1.0 - np.exp(-np.log(2) / halflife)
+            opt_count += 1
+
+        if alpha is not None:
+            if alpha <= 0 or alpha > 1:
+                raise ValueError("alpha must be in (0, 1]")
+            self._alpha = alpha
+            opt_count += 1
+
+        if opt_count == 0:
+            raise ValueError("Must pass one of comass, span, halflife, or 
alpha")
+
+        if opt_count != 1:
+            raise ValueError("comass, span, halflife, and alpha are mutually 
exclusive")
+
+    @abstractmethod
+    def _apply_as_series_or_frame(self, func: Callable[[Column], Column]) -> 
FrameLike:
+        """
+        Wraps a function that handles Spark column in order
+        to support it in both pandas-on-Spark Series and DataFrame.
+        Note that the given `func` name should be same as the API's method 
name.
+        """
+        pass
+
+    def mean(self) -> FrameLike:
+        def mean(scol: Column) -> Column:
+            jf = SparkContext._active_spark_context._jvm.PythonSQLUtils.ewm
+            return F.when(
+                F.row_number().over(self._unbounded_window) >= 
self._min_periods,
+                Column(jf(scol._jc, self._alpha)).over(self._window),
+            ).otherwise(SF.lit(None))
+
+        return self._apply_as_series_or_frame(mean)
+
+
+class ExponentialMoving(ExponentialMovingLike[FrameLike]):
+    def __init__(
+        self,
+        psdf_or_psser: FrameLike,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ):
+        from pyspark.pandas.frame import DataFrame
+        from pyspark.pandas.series import Series
+
+        if not isinstance(psdf_or_psser, (DataFrame, Series)):
+            raise TypeError(
+                "psdf_or_psser must be a series or dataframe; however, got: %s"
+                % type(psdf_or_psser)
+            )
+        self._psdf_or_psser = psdf_or_psser
+
+        window_spec = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(
+            Window.unboundedPreceding, Window.currentRow
+        )
+
+        super().__init__(window_spec, com, span, halflife, alpha, min_periods)
+
+    def __getattr__(self, item: str) -> Any:
+        if hasattr(MissingPandasLikeExponentialMoving, item):
+            property_or_func = getattr(MissingPandasLikeExponentialMoving, 
item)
+            if isinstance(property_or_func, property):
+                return property_or_func.fget(self)
+            else:
+                return partial(property_or_func, self)
+        raise AttributeError(item)
+
+    _apply_as_series_or_frame = Rolling._apply_as_series_or_frame
+
+    def mean(self) -> FrameLike:
+        """
+        Calculate an online exponentially weighted mean.
+
+        Notes
+        -----
+        There are behavior differences between pandas-on-Spark and pandas.
+
+        * the data should not contain NaNs. pandas-on-Spark will return an 
error.
+        * the current implementation of this API uses Spark's Window without
+          specifying partition specification. This leads to move all data into
+          single partition in single machine and could cause serious
+          performance degradation. Avoid this method against very large 
dataset.
+
+        Returns
+        -------
+        Series or DataFrame
+            Returned object type is determined by the caller of the 
exponentially
+            calculation.
+
+        See Also
+        --------
+        Series.expanding : Calling object with Series data.
+        DataFrame.expanding : Calling object with DataFrames.
+        Series.mean : Equivalent method for Series.
+        DataFrame.mean : Equivalent method for DataFrame.
+
+        Examples
+        --------
+        The below examples will show expanding mean calculations with window 
sizes of
+        two and three, respectively.

Review Comment:
   Maybe those are mistakenly from `expanding.mean` ?



##########
python/pyspark/pandas/window.py:
##########
@@ -1749,6 +1752,177 @@ def var(self) -> FrameLike:
         return super().var()
 
 
+class ExponentialMovingLike(Generic[FrameLike], metaclass=ABCMeta):
+    def __init__(
+        self,
+        window: WindowSpec,
+        com: Optional[float] = None,
+        span: Optional[float] = None,
+        halflife: Optional[float] = None,
+        alpha: Optional[float] = None,
+        min_periods: Optional[int] = None,
+    ):
+        if (min_periods is not None) and (min_periods < 0):
+            raise ValueError("min_periods must be >= 0")
+        if min_periods is None:
+            min_periods = 0
+
+        self._window = window
+        # This unbounded Window is later used to handle 'min_periods' for now.
+        self._unbounded_window = 
Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(
+            Window.unboundedPreceding, Window.currentRow
+        )
+        self._min_periods = min_periods
+
+        opt_count = 0
+
+        if com is not None:
+            if com < 0:
+                raise ValueError("com must be >= 0")
+            self._alpha = 1.0 / (1 + com)
+            opt_count += 1
+
+        if span is not None:
+            if span < 1:
+                raise ValueError("span must be >= 1")
+            self._alpha = 2.0 / (1 + span)
+            opt_count += 1
+
+        if halflife is not None:
+            if halflife <= 0:
+                raise ValueError("halflife must be > 0")
+            self._alpha = 1.0 - np.exp(-np.log(2) / halflife)
+            opt_count += 1
+
+        if alpha is not None:
+            if alpha <= 0 or alpha > 1:
+                raise ValueError("alpha must be in (0, 1]")
+            self._alpha = alpha
+            opt_count += 1
+
+        if opt_count == 0:
+            raise ValueError("Must pass one of comass, span, halflife, or 
alpha")

Review Comment:
   Maybe `comass` -> `com` ?? I think it's a typo in pandas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to