Hi,

I'm a little confused about the usage of watermark in SS. According to the
guideline, when we use a window-based grouping, SS will automatically handle
the late event and we should use watermark to limit the state like
this(specify a watermark before groupBy):

val words = ... // streaming DataFrame of schema { timestamp: Timestamp,
word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()


and when I use dropDuplicates I also need a watermark to limit the state,
like this(specify a watermark before dropDuplicates)

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")

so if I use a dropDuplicates before a window-based grouping(like below),
should I use two watermarks(one for dropDuplicate and the other for window)? 

val results = events
      .select(
      window($"timestamp", "1 day"),
      $"timestamp",
      $"uuid"
    )
      .dropDuplicates("uuid", "window")
      .groupBy($"window")
      .count()




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to