In case this helps, this is a Scala helper I am using to filter out late data
on a KeyedStream. The last timestamp state is maintained at the key-level.
```
implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) {
def filterStrictlyAscendingTime(timestampExtractor: T =>
Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = {
stream.filterWithState((currentElement: T, prevElementTimestamp:
Option[Long]) => {
val currentElementTimestamp = timestampExtractor(currentElement)
prevElementTimestamp match {
case None =>
(true, Some(currentElementTimestamp))
case Some(t) =>
if (currentElementTimestamp > t) {
(true, Some(currentElementTimestamp))
} else {
outOfOrderHandler(currentElement, t)
(false, Some(t))
}
}
})
}
def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = {
stream.filterStrictlyAscendingTime(timestampExtractor) {
(element, timestamp) => {
// FLINK-2870 should provide a more idiomatic way to ignore late
arrivals
}
}
}
}
```
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-watermarks-and-late-data-tp5239p5291.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.