itholic commented on code in PR #37801:
URL: https://github.com/apache/spark/pull/37801#discussion_r964508929
##########
python/pyspark/pandas/groupby.py:
##########
@@ -895,6 +895,89 @@ def sem(col: Column) -> Column:
bool_to_numeric=True,
)
+ # TODO: 1, 'n' accepts list and slice; 2, implement 'dropna' parameter
+ def nth(self, n: int) -> FrameLike:
+ """
+ Take the nth row from each group.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ n : int
+ A single nth value for the row
+
+ Examples
+ --------
+
+ >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2],
+ ... 'B': [np.nan, 2, 3, 4, 5]}, columns=['A', 'B'])
+ >>> g = df.groupby('A')
+ >>> g.nth(0)
+ B
+ A
+ 1 NaN
+ 2 3.0
+ >>> g.nth(1)
+ B
+ A
+ 1 2.0
+ 2 5.0
+ >>> g.nth(-1)
+ B
+ A
+ 1 4.0
+ 2 5.0
+
+ See Also
+ --------
+ pyspark.pandas.Series.groupby
+ pyspark.pandas.DataFrame.groupby
+ """
+ groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in
range(len(self._groupkeys))]
+ internal, agg_columns, sdf = self._prepare_reduce(
+ groupkey_names=groupkey_names,
+ accepted_spark_types=None,
+ bool_to_numeric=False,
+ )
+ psdf: DataFrame = DataFrame(internal)
+
+ if len(psdf._internal.column_labels) > 0:
+ window1 =
Window.partitionBy(*groupkey_names).orderBy(NATURAL_ORDER_COLUMN_NAME)
+ tmp_row_number_col = "__tmp_row_number_col__"
+ if n >= 0:
+ sdf = (
+ psdf._internal.spark_frame.withColumn(
+ tmp_row_number_col, F.row_number().over(window1)
+ )
+ .where(F.col(tmp_row_number_col) == n + 1)
+ .drop(tmp_row_number_col)
+ )
+ else:
+ window2 = Window.partitionBy(*groupkey_names).rowsBetween(
+ Window.unboundedPreceding, Window.unboundedFollowing
+ )
+ tmp_group_size_col = "__tmp_group_size_col__"
+ sdf = (
+ psdf._internal.spark_frame.withColumn(
+ tmp_group_size_col, F.count(F.lit(0)).over(window2)
+ )
+ .withColumn(tmp_row_number_col,
F.row_number().over(window1))
+ .where(F.col(tmp_row_number_col) ==
F.col(tmp_group_size_col) + 1 + n)
+ .drop(tmp_group_size_col, tmp_row_number_col)
+ )
+ else:
+ sdf = sdf.select(*groupkey_names).distinct()
Review Comment:
Oh... I just noticed that we're following the pandas behavior even though
there is a bug in pandas.
When there is a bug in pandas, we usually do something like this:
- we don't follow the behavior of pandas, just we assume it works properly
and implement it.
- comment the link related [pandas
issues](https://github.com/pandas-dev/pandas/issues) to the test, from pandas
repository(`https://github.com/pandas-dev/pandas/issues/...`) as below:
-
https://github.com/apache/spark/blob/0830575100c1bcbc89d98a6859fe3b8d46ca2e6e/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py#L510-L517
-
https://github.com/apache/spark/blob/0830575100c1bcbc89d98a6859fe3b8d46ca2e6e/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py#L478-L483
- If it's not clear that it's a bug (unless it's not an officially discussed
as a bug in pandas community), we can just follow the pandas behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]