Hi Mike, RichWindowFunction not being supported for the Scala API is an oversight on our side. We’re working to fix it.
For ReduceFunction and FoldFunction it’s a bit more tricky, since they are right now not proper operator functions but only used inside the internal window state to incrementally combine the elements. Cheers, Aljoscha > On 04 Mar 2016, at 03:20, shikhar <[email protected]> wrote: > > In case this helps, this is a Scala helper I am using to filter out late data > on a KeyedStream. The last timestamp state is maintained at the key-level. > > ``` > implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) { > > def filterStrictlyAscendingTime(timestampExtractor: T => > Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = { > stream.filterWithState((currentElement: T, prevElementTimestamp: > Option[Long]) => { > val currentElementTimestamp = timestampExtractor(currentElement) > prevElementTimestamp match { > case None => > (true, Some(currentElementTimestamp)) > case Some(t) => > if (currentElementTimestamp > t) { > (true, Some(currentElementTimestamp)) > } else { > outOfOrderHandler(currentElement, t) > (false, Some(t)) > } > } > }) > } > > def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = { > stream.filterStrictlyAscendingTime(timestampExtractor) { > (element, timestamp) => { > // FLINK-2870 should provide a more idiomatic way to ignore late > arrivals > } > } > } > > } > ``` > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-watermarks-and-late-data-tp5239p5291.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
