HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1159194573


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2132,6 +2132,61 @@ streamingDf <- withWatermark(streamingDf, "eventTime", 
"10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+</div>
+
+</div>
+
+Specifically for streaming, you can deduplicate records in data streams using 
a unique identifier in the events, within the time range of watermark.
+For example, if you set the delay threshold of watermark as "1 hour", 
duplicated events which occurred within 1 hour can be correctly deduplicated.
+(For more details, please refer to the API doc of 
[dropDuplicatesWithinWatermark](/api/scala/org/apache/spark/sql/Dataset.html#dropDuplicatesWithinWatermark():org.apache.spark.sql.Dataset[T]).)
+
+This can be used to deal with use case where event time column cannot be a 
part of unique identifier, mostly due to the case
+where event times are somehow different for the same records. (E.g. 
non-idempotent writer where issuing event time happens at write)
+
+Users are encouraged to set the delay threshold of watermark longer than max 
timestamp differences among duplicated events.
+
+This feature requires watermark with delay threshold to be set in streaming 
DataFrame/Dataset.
+
+<div class="codetabs">
+
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+streamingDf = spark.readStream. ...
+
+# deduplicate using guid column with watermark based on eventTime column
+streamingDf \
+  .withWatermark("eventTime", "10 seconds") \
+  .dropDuplicatesWithinWatermark("guid")
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...
+
+// deduplicate using guid column with watermark based on eventTime column
+streamingDf
+  .withWatermark("eventTime", "10 seconds")

Review Comment:
   Agreed for this API. Will fix. Thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to