Hi, I'm a little confused about the usage of watermark in SS. According to the guideline, when we use a window-based grouping, SS will automatically handle the late event and we should use watermark to limit the state like this(specify a watermark before groupBy):
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } // Group the data by window and word and compute the count of each group val windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word") .count() and when I use dropDuplicates I also need a watermark to limit the state, like this(specify a watermark before dropDuplicates) val streamingDf = spark.readStream. ... // columns: guid, eventTime, ... // Without watermark using guid column streamingDf.dropDuplicates("guid") // With watermark using guid and eventTime columns streamingDf .withWatermark("eventTime", "10 seconds") .dropDuplicates("guid", "eventTime") so if I use a dropDuplicates before a window-based grouping(like below), should I use two watermarks(one for dropDuplicate and the other for window)? val results = events .select( window($"timestamp", "1 day"), $"timestamp", $"uuid" ) .dropDuplicates("uuid", "window") .groupBy($"window") .count() -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org