Hi,
I use bounded over-window aggregation in my application. However, sometimes some input elements are "discarded" and not generating output. By reading the source code of RowTimeBoundedRangeOver.scala, I realize the record is actually discarded if it is out of order. Please see the quoted code block below. Please help me to understand why don't we sort the record first? Said we are using BoundedOutOfOrdernessTimestampExtractor. we can use watermark to select a portion of the elements to do the sorting. when watermark proceeds, process the elements that are before the watermark and extend the portion of elements for sorting. Best Yan override def processElement( inputC: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]): Unit = { // triggering timestamp for trigger calculation val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long] val lastTriggeringTs = lastTriggeringTsState.value // check if the data is expired, if not, save the data and register event time timer if (triggeringTs > lastTriggeringTs) { // put in cache, and register timer to process/clean // ... }else{ // DISCARD } }