Hi all, I'm using Flink 1.9.2 and I would like to ask about my use case and approach I've took to meet it.
The use case: I have a keyed stream, where I have to buffer messages with logic: 1. Buffering should start only when message arrives. 2. The max buffer time should not be longer than 3 seconds 3. Each new message should NOT prolong the buffer time. 4. If particular business condition will be meet, buffering should stop and all messages should be let through further processing. The business logic in point 4 is taking under the consideration data from previously buffered messages in this time buffer session. My setup for this is 1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for this). 2. Custom Trigger The custom trigger: 1. keeps some data in its state under AggregatingStateDescriptor allowing me to override "merge" method from Trigger class. 2. In onElement method, for the first call I execute ctx.registerEventTimeTimer(window.maxTimestamp()); Additionally in this method I added the busioenss logic which returns TriggerResult.FIRE or TriggerResult.CONTINUE 3. The onProcessingTime methods returns TriggerResult.FIRE 3. all other methods are returning TriggerResult.CONTINUE As a result, I can observe that my window is fired two times. One from onElement method where the busienss condition is meet and second time from onProcessingTime method. What is the best way to prevent this? Regards, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/