[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-28 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r794989958



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,190 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def multiply(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> def max_with_return_type(x, y) -> float:
+... return max(x, y)
+...
+>>> s1.combine(s2, max_with_return_type)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max_with_return_type, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+ 

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793351769



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+raise TypeError()
+return_spark_type = sig_return.spark_type
+return_dtype = sig_return.dtype
+

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793220795



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+raise TypeError()
+return_spark_type = sig_return.spark_type
+return_dtype = sig_return.dtype
+

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793351769



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+raise TypeError()
+return_spark_type = sig_return.spark_type
+return_dtype = sig_return.dtype
+

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793351769



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+raise TypeError()
+return_spark_type = sig_return.spark_type
+return_dtype = sig_return.dtype
+

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793351769



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+raise TypeError()
+return_spark_type = sig_return.spark_type
+return_dtype = sig_return.dtype
+

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793348605



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,190 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def multiply(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> def max_with_return_type(x, y) -> float:
+... return max(x, y)
+...
+>>> s1.combine(s2, max_with_return_type)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max_with_return_type, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+ 

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-27 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793348605



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,190 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def multiply(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> def max_with_return_type(x, y) -> float:
+... return max(x, y)
+...
+>>> s1.combine(s2, max_with_return_type)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max_with_return_type, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not callable(func):
+raise TypeError("'%s' object is not callable" % 
type(func).__name__)
+
+if pd.api.types.is_scalar(other):  # type: ignore[attr-defined]
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+else:
+assert isinstance(other, Series)
+if same_anchor(self, other):
+if self._column_label == other._column_label:
+raise ValueError("Unsupported self combine")
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, (UnknownType, DataFrameType)):
+ 

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-26 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793225136



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:

Review comment:
   or maybe just give a `max` exmaple, it would be fluent when user see 
below doctest.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-26 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r793219057



##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:
+... return x * y
+
+pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+This API does not support self combine for now.
+
+>>> psser1 = ps.Series([1, 2, 3, 4])
+>>> psser1.combine(psser1, max)  # doctest: +SKIP
+...
+ValueError: Unsupported self combine
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)

Review comment:
   but this is not a best practice for `the good note`, let's give the 
right example like:
   ```
   >>> def max_with_return_type(x, y) -> float:
   ... return max(x, y)
   >>> s1.combine(s2, max)
   ```

##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: This API executes the function once to infer the type which 
is
+potentially expensive, for instance, when the dataset is created 
after
+aggregations or sorting.
+
+To avoid this, specify return type in ``func``, for instance, as 
below:
+
+>>> def foo(x, y) -> np.int32:

Review comment:
   nits:
   ```suggestion
   >>> def multiply(x, y) -> np.int32:
   ```

##
File path: python/pyspark/pandas/series.py
##
@@ -4483,6 +4487,181 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: Union[Scalar, "Series"],
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using 

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-16 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r785691941



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]

Review comment:
   And if it's not very easy to solve, we'd better to do some 
investigation, and raise a jira for reference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-16 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r785687385



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:

Review comment:
   Thanks for clarify for `same_anchor with fill_value`, so we'd better 
also change the below test to cover this specific case, see it work or not?
   
   Ah, I mean `when an index is missing from one Series or the other`, rather 
than `same_anchor with fill_value`, which was mentioned in below comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-16 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r785687385



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:

Review comment:
   Thanks for clarify for `same_anchor with fill_value`, so we'd better 
also change the below test to cover this specific case, see it work or not?
   
   Ah, I mean `when an index is missing from one Series or the other`, rather 
than `same_anchor with fill_value`, which is mentioned in below comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-16 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r785687385



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:

Review comment:
   Thanks for clarify for `same_anchor with fill_value`, so we'd better 
also change the below test to cover this specific case, see it work or not?
   
   Ah, I mean `when an index is missing from one Series or the other`, rather 
than `same_anchor with fill_value`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-16 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r785687385



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:

Review comment:
   Thanks for clarify, so we'd better also change the below test to cover 
this specific case, see it work or not?
   
   Ah, I mean `when an index is missing from one Series or the other`, rather 
than `same_anchor with fill_value`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-16 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r785687385



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:

Review comment:
   Thanks for clarify, so we'd better also change the below test to cover 
this specific case, see it work or not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782652495



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):

Review comment:
   nit: It reasonable, but I guess It raised the different exception with 
pandas. If we still need this, at least, add a note on doc to say it's the 
different behavior with pandas but we do the right thing maybe? or the type 
hints is enough in here.
   
   and "Unsupported"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782649085



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",

Review comment:
   `other` can be `Series` or scalar in here, so it should be 
`Union[Scalar, "Series"]`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782669292



##
File path: python/pyspark/pandas/tests/test_series.py
##
@@ -3020,6 +3020,67 @@ def test_eq(self):
 with self.assertRaisesRegex(ValueError, "Lengths must be equal"):
 psser == other
 
+def test_combine(self):
+pdf = pd.DataFrame(
+{"s1": [330.0, 160.0, np.nan], "s2": [345.0, 0.0, 30.0], "s3": 
[345.0, 0.0, 30.0]}
+)
+psdf = ps.from_pandas(pdf)
+
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], max),
+psdf["s1"].combine(psdf["s2"], max),
+)
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], max, fill_value=100),

Review comment:
   Question: Does `fill_value` work in pandas actually? Looks like we need 
to find a real case to fill the value I guess.
   ```
   >>> pdf["s1"].combine(pdf["s2"], max, fill_value=100)
   0345.0
   1160.0
   2  NaN
   dtype: float64
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782668819



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if not callable(func):
+raise TypeError("%s object is not callable" % type(func).__name__)
+
+if pd.api.types.is_scalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:

Review comment:
   quick question: What if same_anchor with `fill_value`? see also my 
comments below test about `fill_value`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782649085



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",

Review comment:
   `other` can be `Series` or scalar in here, so I guess it should be 
`Union[Scalar, "Series"]`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782672473



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,170 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is required.

Review comment:
   oh, sorry, it's said for type hint, just ignore my comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782672043



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,170 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is required.

Review comment:
   No, I think it should be required.
   ```
   >>> pdf["s1"].combine(pdf["s2"])
   Traceback (most recent call last):
 File "", line 1, in 
   TypeError: combine() missing 1 required positional argument: 'func'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782652495



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):

Review comment:
   nit: It reasonable, but I guess It raised the different exception with 
pandas. If we still need this, at least, add a note on it to say it's the 
different behavior with pandas but we do the right thing maybe? or the type 
hints is enough in here.
   
   and "Unsupported"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a change in pull request #34212: [SPARK-36402][PYTHON] Implement Series.combine

2022-01-11 Thread GitBox


Yikun commented on a change in pull request #34212:
URL: https://github.com/apache/spark/pull/34212#discussion_r782649085



##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",

Review comment:
   other can be `Series` or scalar in here, so I guess it should be 
`Union[Scalar, "Series"]`?

##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is strongly recommended.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not pd.api.types.is_scalar(other):

Review comment:
   It reasonable, but I guess It raised the different exception with 
pandas. 

##
File path: python/pyspark/pandas/series.py
##
@@ -4485,6 +4489,173 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def