Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/17539#discussion_r109960948 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -871,6 +871,65 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat </div> </div> +### Streaming Deduplication +You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. + +- *With watermark* - If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain. + +- *Without watermark* - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val streamingDf = spark.readStream. ... // columns: guid, eventTime, ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid") + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") + .dropDuplicates("guid", "eventTime") +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +Dataset<Row> streamingDf = spark.readStream. ...; // columns: guid, eventTime, ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid"); + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") + .dropDuplicates("guid", "eventTime"); +{% endhighlight %} + + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +streamingDf = spark.readStream. ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid") + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") --- End diff -- nit: missing ` \` at end of line
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org