HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1159299810


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3025,107 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.

Review Comment:
   I might misunderstand your point. Technically it does not need to be the 
same with existing API - maybe we can emit the late event as it is without 
deduplication, but that will give us more complicated scenarios to deal with. 
For example, what if two duplicated events are ingested in late time but they 
are closer to the watermark delay threshold by themselves? Should/Can we 
deduplicate them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to