Hi, I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not correctly set. You can try it with this fixed version, that I will also update in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger will only ever fire for a window. This means that you will have a lot of windows hanging around in your state at some points and they will never be cleaned up. For now, if you require the behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time and also has the continuous triggering at earlier times. Let us know if you need more information about this. Kostas Kloudas also recently looked into writing custom Triggers, so maybe he has some material he could give to you. Cheers, Aljoscha > On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> wrote: > > Hello, > > I have a question about TumblingProcessingTimeWindow and > ContinuousProcessingTimeTrigger. > > The code I tried is below. Output the distinct count of the words, > counts are printed every 5 seconds and window is reset every 1 minute. > > --- > val input = > env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) > .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } > .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) > .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) > .fold(Set[String]()){(r,i) => { r + i}} > .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} > > input print > --- > > I wrote data to the input file with some interval. > > --- > echo "aaa" >> input.txt > echo "aaa" >> input.txt > sleep 10 > echo "bbb" >> input.txt > sleep 60 > echo "ccc" >> input.txt > --- > > The result I got was just 1 record. The expected output was 1 -> (10+ > sec later) 2 -> (60+ sec later) 1 . > --- > (2016-03-18 13:08:59.288,2) > --- > > Even after several minutes, I never got additional record. In my > understanding, with > ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two > operator (fold, map) in the code above will be evaluated every 5 > seconds. > Am I mis-understand something? > > Regards, > Hironori