Hello Mike,

The code that Aljiosha mentioned is here:

https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
 
<https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>

This allows you to specify a trigger like:

EventTimeTriggerWithEarlyAndLateFiring trigger =
                        EventTimeTriggerWithEarlyAndLateFiring.create()
                                        .withEarlyFiringEvery(Time.minutes(10))
                                        .withLateFiringEvery(Time.minutes(5))
                                        .withAllowedLateness(Time.minutes(20))
                                        .accumulating();

The means that it will fire every 10 minutes (in processing time) until the end 
of the window (event time), and then
every 5 minutes (processing time) for late elements up to 20 minutes late. In 
addition, previous elements are not discarded.

Hope this helps,
Kostas

> On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> I did some initial work on extending the EventTimeTrigger a bit to allow more 
> complex behavior. Specifically, this allows setting an “allowed lateness” 
> after which elements should no longer lead to windows being emitted. Also, it 
> allows to specify to keep an emitted window in memory and when a late element 
> arrives emit the whole window again.
> 
> The code I have is here: 
> https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
> 
> Kostas Kloudas worked on extending it, so maybe he could share his version of 
> the trigger as well.
> 
> Cheers,
> Aljoscha
>> On 01 Mar 2016, at 18:35, Michael Radford <mub...@gmail.com> wrote:
>> 
>> I'm evaluating Flink for a reporting application that will keep
>> various aggregates updated in a database. It will be consuming from
>> Kafka queues that are replicated from remote data centers, so in case
>> there is a long outage in replication, I need to decide what to do
>> about windowing and late data.
>> 
>> If I use Flink's built-in windows and watermarks, any late data will
>> be come in 1-element windows, which could overwhelm the database if a
>> large batch of late data comes in and they are each mapped to
>> individual database updates.
>> 
>> As far as I can tell, I have two options:
>> 
>> 1. Ignore late data, by marking it as late in an
>> AssignerWithPunctuatedWatermarks function, and then discarding it in a
>> flatMap operator. In this scenario, I would rely on a batch process to
>> fill in the missing data later, in the lambda architecture style.
>> 
>> 2. Implement my own watermark logic to allow full windows of late
>> data. It seems like I could, for example, emit a "tick" message that
>> is replicated to all partitions every n messages, and then a custom
>> Trigger could decide when to purge each window based on the ticks and
>> a timeout duration. The system would never emit a real Watermark.
>> 
>> My questions are:
>> - Am I mistaken about either of these, or are there any other options
>> I'm not seeing for avoiding 1-element windows?
>> - For option 2, are there any problems with not emitting actual
>> watermarks, as long as the windows are eventually purged by a trigger?
>> 
>> Thanks,
>> Mike
> 

Reply via email to