Hi,

I use bounded over-window  aggregation in my application. However, sometimes 
some input elements are "discarded" and not generating output. By reading the 
source code of RowTimeBoundedRangeOver.scala, I realize the record is actually 
discarded if it is out of order. Please see the quoted code block below. Please 
help me to understand why don't we sort the record first? Said we are using 
BoundedOutOfOrdernessTimestampExtractor. we can use watermark to select a 
portion of the elements to do the sorting. when watermark proceeds, process the 
elements that are before the watermark and extend the portion of elements for 
sorting.



Best

Yan



override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time 
timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean
// ...
}else{
// DISCARD
}
}

Reply via email to