Hi, I read the following in Flink doc "We can explicitly specify a Trigger to overwrite the default Trigger provided by the WindowAssigner. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger." So, I tested out the below code with count trigger. As per my understanding this will override the default watermark based trigger.
val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, 4), ("2016-04-07 13:11:59", 157428, 4), ("2016-04-07 13:11:59", 111283, 23), ("2016-04-07 13:11:57", 108042, 23), ("2016-04-07 13:12:00", 161374, 9), ("2016-04-07 13:12:00", 161374, 9), ("2016-04-07 13:11:59", 136505, 4) ) ) .assignAscendingTimestamps(b => f.parse(b._1).getTime()) .map(b => (b._3, b._2)) testStream.print val countStream = testStream .keyBy(_._1) .timeWindow(Time.seconds(20)) .trigger(CountTrigger.of(3)) .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } countStream.print Output I saw confirms the documented behavior. Processing is triggered only when we have 3 elements for a key. How do I force trigger the left over records when watermark is past the window? I.e, I want to use triggers to start early processing but finalize the window based on watermark. Output shows that records for keys 23 & 9 weren't processed. (4,157428) (4,157428) (23,111283) (23,108042) (9,161374) (9,161374) (4,136505) (4,List(157428, 157428, 136505)) Thanks, Srikanth