zsxwing commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1159264554
##########
python/pyspark/sql/dataframe.py:
##########
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] =
None) -> "DataFrame":
jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> "DataFrame":
Review Comment:
@HyukjinKwon what's our current policy to add Python methods? Does it have
to be a pythonic name such as `drop_duplicates_within_watermark`, or do we
still follow the scala method name?
##########
python/pyspark/sql/dataframe.py:
##########
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] =
None) -> "DataFrame":
jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> "DataFrame":
+ """Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+ For a static batch :class:`DataFrame`, it just drops duplicate rows.
For a streaming
+ :class:`DataFrame`, this will keep all data across triggers as
intermediate state to drop
+ duplicated rows. The state will be kept to guarantee the semantic,
"Events are deduplicated
+ as long as the time distance of earliest and latest events are smaller
than the delay
+ threshold of watermark." The watermark for the input
:class:`DataFrame` must be set via
+ :func:`withWatermark`. Users are encouraged to set the delay threshold
of watermark longer
+ than max timestamp differences among duplicated events. In addition,
too late data older
+ than watermark will be dropped.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ subset : List of column names, optional
+ List of columns to use for duplicate comparison (default All
columns).
+
+ Returns
+ -------
+ :class:`DataFrame`
+ DataFrame without duplicates.
+
+ Examples
+ --------
+ >>> from pyspark.sql import Row
+ >>> df = spark.createDataFrame([
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=10, height=80)
+ ... ])
+
+ Deduplicate the same rows.
+
+ >>> df.dropDuplicatesWithinWatermark().show()
+ +-----+---+------+
+ | name|age|height|
+ +-----+---+------+
+ |Alice| 5| 80|
+ |Alice| 10| 80|
+ +-----+---+------+
+
+ Deduplicate values on 'name' and 'height' columns.
+
+ >>> df.dropDuplicatesWithinWatermark(['name', 'height']).show()
Review Comment:
ditto
##########
python/pyspark/sql/dataframe.py:
##########
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] =
None) -> "DataFrame":
jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> "DataFrame":
+ """Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+ For a static batch :class:`DataFrame`, it just drops duplicate rows.
For a streaming
+ :class:`DataFrame`, this will keep all data across triggers as
intermediate state to drop
+ duplicated rows. The state will be kept to guarantee the semantic,
"Events are deduplicated
+ as long as the time distance of earliest and latest events are smaller
than the delay
+ threshold of watermark." The watermark for the input
:class:`DataFrame` must be set via
+ :func:`withWatermark`. Users are encouraged to set the delay threshold
of watermark longer
+ than max timestamp differences among duplicated events. In addition,
too late data older
+ than watermark will be dropped.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ subset : List of column names, optional
+ List of columns to use for duplicate comparison (default All
columns).
+
+ Returns
+ -------
+ :class:`DataFrame`
+ DataFrame without duplicates.
+
+ Examples
+ --------
+ >>> from pyspark.sql import Row
+ >>> df = spark.createDataFrame([
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=10, height=80)
+ ... ])
+
+ Deduplicate the same rows.
+
+ >>> df.dropDuplicatesWithinWatermark().show()
Review Comment:
Can we create examples using watermark?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
d.withNewChildren(Seq(simplifyUnion(u)))
case d @ Deduplicate(_, u: Union) =>
d.withNewChildren(Seq(simplifyUnion(u)))
+ case d @ DeduplicateWithinWatermark(_, u: Union) =>
Review Comment:
> Shall we extend existing 'DropDuplicate' Node to support this use case
rather than introducing another logical node 'DropDuplicatesWithinWatermark'.
I don't have a strong opinion on this. Will let you two to decide 😄
##########
python/pyspark/sql/dataframe.py:
##########
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] =
None) -> "DataFrame":
jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> "DataFrame":
+ """Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+ For a static batch :class:`DataFrame`, it just drops duplicate rows.
For a streaming
+ :class:`DataFrame`, this will keep all data across triggers as
intermediate state to drop
+ duplicated rows. The state will be kept to guarantee the semantic,
"Events are deduplicated
+ as long as the time distance of earliest and latest events are smaller
than the delay
+ threshold of watermark." The watermark for the input
:class:`DataFrame` must be set via
+ :func:`withWatermark`. Users are encouraged to set the delay threshold
of watermark longer
Review Comment:
Does batch DataFrame require watermark? I think it's better to make it fail
if there is no watermark to make it consistent with streaming DataFrame.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]