Hello Mike, The code that Aljiosha mentioned is here:
https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java <https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java> This allows you to specify a trigger like: EventTimeTriggerWithEarlyAndLateFiring trigger = EventTimeTriggerWithEarlyAndLateFiring.create() .withEarlyFiringEvery(Time.minutes(10)) .withLateFiringEvery(Time.minutes(5)) .withAllowedLateness(Time.minutes(20)) .accumulating(); The means that it will fire every 10 minutes (in processing time) until the end of the window (event time), and then every 5 minutes (processing time) for late elements up to 20 minutes late. In addition, previous elements are not discarded. Hope this helps, Kostas > On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > I did some initial work on extending the EventTimeTrigger a bit to allow more > complex behavior. Specifically, this allows setting an “allowed lateness” > after which elements should no longer lead to windows being emitted. Also, it > allows to specify to keep an emitted window in memory and when a late element > arrives emit the whole window again. > > The code I have is here: > https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java > > Kostas Kloudas worked on extending it, so maybe he could share his version of > the trigger as well. > > Cheers, > Aljoscha >> On 01 Mar 2016, at 18:35, Michael Radford <mub...@gmail.com> wrote: >> >> I'm evaluating Flink for a reporting application that will keep >> various aggregates updated in a database. It will be consuming from >> Kafka queues that are replicated from remote data centers, so in case >> there is a long outage in replication, I need to decide what to do >> about windowing and late data. >> >> If I use Flink's built-in windows and watermarks, any late data will >> be come in 1-element windows, which could overwhelm the database if a >> large batch of late data comes in and they are each mapped to >> individual database updates. >> >> As far as I can tell, I have two options: >> >> 1. Ignore late data, by marking it as late in an >> AssignerWithPunctuatedWatermarks function, and then discarding it in a >> flatMap operator. In this scenario, I would rely on a batch process to >> fill in the missing data later, in the lambda architecture style. >> >> 2. Implement my own watermark logic to allow full windows of late >> data. It seems like I could, for example, emit a "tick" message that >> is replicated to all partitions every n messages, and then a custom >> Trigger could decide when to purge each window based on the ticks and >> a timeout duration. The system would never emit a real Watermark. >> >> My questions are: >> - Am I mistaken about either of these, or are there any other options >> I'm not seeing for avoiding 1-element windows? >> - For option 2, are there any problems with not emitting actual >> watermarks, as long as the windows are eventually purged by a trigger? >> >> Thanks, >> Mike >