Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793219057



##########
File path: python/pyspark/pandas/series.py
##########
@@ -4483,6 +4487,181 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: Union[Scalar, "Series"],
+        func: Callable,
+        fill_value: Optional[Any] = None,
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        .. note:: This API executes the function once to infer the type which 
is
+            potentially expensive, for instance, when the dataset is created 
after
+            aggregations or sorting.
+
+            To avoid this, specify return type in ``func``, for instance, as 
below:
+
+            >>> def foo(x, y) -> np.int32:
+            ...     return x * y
+
+            pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+            This API does not support self combine for now.
+
+            >>> psser1 = ps.Series([1, 2, 3, 4])
+            >>> psser1.combine(psser1, max)  # doctest: +SKIP
+            ...
+            ValueError: Unsupported self combine
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+            Note that type hint for return type is strongly recommended.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max)

Review comment:
       but this is not a best practice for `the good note`, let's give the 
right example like:
   ```
           >>> def max_with_return_type(x, y) -> float:
           ...     return max(x, y)
           >>> s1.combine(s2, max)
   ```

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4483,6 +4487,181 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: Union[Scalar, "Series"],
+        func: Callable,
+        fill_value: Optional[Any] = None,
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        .. note:: This API executes the function once to infer the type which 
is
+            potentially expensive, for instance, when the dataset is created 
after
+            aggregations or sorting.
+
+            To avoid this, specify return type in ``func``, for instance, as 
below:
+
+            >>> def foo(x, y) -> np.int32:

Review comment:
       nits:
   ```suggestion
               >>> def multiply(x, y) -> np.int32:
   ```

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4483,6 +4487,181 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: Union[Scalar, "Series"],
+        func: Callable,
+        fill_value: Optional[Any] = None,
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        .. note:: This API executes the function once to infer the type which 
is
+            potentially expensive, for instance, when the dataset is created 
after
+            aggregations or sorting.
+
+            To avoid this, specify return type in ``func``, for instance, as 
below:
+
+            >>> def foo(x, y) -> np.int32:
+            ...     return x * y
+
+            pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+            This API does not support self combine for now.
+
+            >>> psser1 = ps.Series([1, 2, 3, 4])
+            >>> psser1.combine(psser1, max)  # doctest: +SKIP
+            ...
+            ValueError: Unsupported self combine
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+            Note that type hint for return type is strongly recommended.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max)
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0)

Review comment:
       ditto

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4483,6 +4487,181 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: Union[Scalar, "Series"],
+        func: Callable,
+        fill_value: Optional[Any] = None,
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        .. note:: This API executes the function once to infer the type which 
is
+            potentially expensive, for instance, when the dataset is created 
after
+            aggregations or sorting.
+
+            To avoid this, specify return type in ``func``, for instance, as 
below:
+
+            >>> def foo(x, y) -> np.int32:
+            ...     return x * y
+
+            pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+            This API does not support self combine for now.
+
+            >>> psser1 = ps.Series([1, 2, 3, 4])
+            >>> psser1.combine(psser1, max)  # doctest: +SKIP
+            ...
+            ValueError: Unsupported self combine
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+            Note that type hint for return type is strongly recommended.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max)
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0)
+        duck       30.0
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+        if not callable(func):
+            raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+        if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+            tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+            combined = self.to_frame()
+            combined[tmp_other_col] = other
+            combined = DataFrame(combined._internal.resolved_copy)
+        else:
+            assert isinstance(other, Series)
+            if same_anchor(self, other):
+                if self._column_label == other._column_label:
+                    raise ValueError("Unsupported self combine")
+                combined = self._psdf[self._column_label, other._column_label]
+            elif fill_value is None:
+                combined = combine_frames(self.to_frame(), other.to_frame())
+            else:
+                combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+        try:
+            sig_return = infer_return_type(func)
+            if isinstance(sig_return, (UnknownType, DataFrameType)):
+                raise TypeError()
+            return_spark_type = sig_return.spark_type
+            return_dtype = sig_return.dtype
+        except (ValueError, TypeError):

Review comment:
       Let's add note and log_advice in here, like:
   ```suggestion
           except (ValueError, TypeError):
               # Here we execute with the first 1000 to get the return type.
               # If the records were less than 1000, it uses pandas API 
directly for a shortcut.
               log_advice(
                   "If the type hints is not specified the `func` parameter for 
`series.combine`, "
                   "it is expensive to infer the data type internally."
               )
   ```

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4483,6 +4487,181 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: Union[Scalar, "Series"],
+        func: Callable,
+        fill_value: Optional[Any] = None,
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        .. note:: This API executes the function once to infer the type which 
is
+            potentially expensive, for instance, when the dataset is created 
after
+            aggregations or sorting.
+
+            To avoid this, specify return type in ``func``, for instance, as 
below:
+
+            >>> def foo(x, y) -> np.int32:
+            ...     return x * y

Review comment:
       Yep, this is a good note

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4483,6 +4487,181 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: Union[Scalar, "Series"],
+        func: Callable,
+        fill_value: Optional[Any] = None,
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        .. note:: This API executes the function once to infer the type which 
is
+            potentially expensive, for instance, when the dataset is created 
after
+            aggregations or sorting.
+
+            To avoid this, specify return type in ``func``, for instance, as 
below:
+
+            >>> def foo(x, y) -> np.int32:
+            ...     return x * y
+
+            pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+            This API does not support self combine for now.
+
+            >>> psser1 = ps.Series([1, 2, 3, 4])
+            >>> psser1.combine(psser1, max)  # doctest: +SKIP
+            ...
+            ValueError: Unsupported self combine
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+            Note that type hint for return type is strongly recommended.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max)
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0)
+        duck       30.0
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+        if not callable(func):
+            raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+        if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+            tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+            combined = self.to_frame()
+            combined[tmp_other_col] = other
+            combined = DataFrame(combined._internal.resolved_copy)
+        else:
+            assert isinstance(other, Series)
+            if same_anchor(self, other):
+                if self._column_label == other._column_label:
+                    raise ValueError("Unsupported self combine")
+                combined = self._psdf[self._column_label, other._column_label]
+            elif fill_value is None:
+                combined = combine_frames(self.to_frame(), other.to_frame())
+            else:
+                combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+        try:
+            sig_return = infer_return_type(func)
+            if isinstance(sig_return, (UnknownType, DataFrameType)):
+                raise TypeError()
+            return_spark_type = sig_return.spark_type

Review comment:
       nits: no regresson test on this, but if you update the doctest as my 
suggestion, It will have.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to