itholic commented on a change in pull request #36006:
URL: https://github.com/apache/spark/pull/36006#discussion_r837986152
##########
File path: python/pyspark/pandas/tests/indexes/test_base.py
##########
@@ -368,10 +368,19 @@ def test_drop_duplicates(self):
pidx = pd.Index([4, 2, 4, 1, 4, 3])
psidx = ps.from_pandas(pidx)
- self.assert_eq(psidx.drop_duplicates().sort_values(),
pidx.drop_duplicates().sort_values())
- self.assert_eq(
- (psidx + 1).drop_duplicates().sort_values(), (pidx +
1).drop_duplicates().sort_values()
- )
+ self.assert_eq(psidx.drop_duplicates(), pidx.drop_duplicates())
+ self.assert_eq((psidx + 1).drop_duplicates(), (pidx +
1).drop_duplicates())
+
+ self.assert_eq(psidx.drop_duplicates(keep="first"),
pidx.drop_duplicates(keep="first"))
+ self.assert_eq(psidx.drop_duplicates(keep="last"),
pidx.drop_duplicates(keep="last"))
+ self.assert_eq(psidx.drop_duplicates(keep=False),
pidx.drop_duplicates(keep=False))
+
+ arrays = [[1, 2, 3, 1, 2], ["red", "blue", "black", "red", "blue"]]
+ pmidx = pd.MultiIndex.from_arrays(arrays, names=("number", "color"))
+ psmidx = ps.from_pandas(pmidx)
+ self.assert_eq(psmidx.drop_duplicates(keep="first"),
pmidx.drop_duplicates(keep="first"))
+ self.assert_eq(psmidx.drop_duplicates(keep="last"),
pmidx.drop_duplicates(keep="last"))
+ self.assert_eq(psmidx.drop_duplicates(keep=False),
pmidx.drop_duplicates(keep=False))
Review comment:
Shall we also have a test for the default case for `MultiIndex`?
```python
self.assert_eq(psmidx.drop_duplicates(), pmidx.drop_duplicates())
```
##########
File path: python/pyspark/pandas/indexes/base.py
##########
@@ -898,25 +905,19 @@ def drop_duplicates(self) -> "Index":
Examples
--------
- Generate an pandas.Index with duplicate values.
+ Generate an Index with duplicate values.
>>> idx = ps.Index(['lama', 'cow', 'lama', 'beetle', 'lama', 'hippo'])
>>> idx.drop_duplicates().sort_values()
Index(['beetle', 'cow', 'hippo', 'lama'], dtype='object')
"""
- sdf = self._internal.spark_frame.select(
- self._internal.index_spark_columns
- ).drop_duplicates()
- internal = InternalFrame(
- spark_frame=sdf,
- index_spark_columns=[
- scol_for(sdf, col) for col in
self._internal.index_spark_column_names
- ],
- index_names=self._internal.index_names,
- index_fields=self._internal.index_fields,
- )
- return DataFrame(internal).index
+ with ps.option_context("compute.default_index_type", "distributed"):
+ # The attached index caused by `reset_index` below is used for
sorting only,
+ # and it will be dropped soon,
+ # so we enforce “distributed” default index type
+ psser = self.to_series().reset_index(drop=True)
+ return Index(psser.drop_duplicates(keep=keep).sort_index())
Review comment:
I think maybe we should manually manage the `keep` parameter in single
pass like we're doing for `DataFrame.drop_duplicates` as below if possible ?
https://github.com/apache/spark/blob/master/python/pyspark/pandas/frame.py#L4330-L4347
https://github.com/apache/spark/blob/42a9114f4c509f1f369bfb1499ea99c187518c48/python/pyspark/pandas/frame.py#L4330-L4347
I just concern that seems like the APIs such as `to_series()`,
`reset_index()`, `drop_duplicates()` and `sort_index()` may trigger the
multiple Spark job which is potentially very expensive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]