It might be more complicated if you want to take into account events coming in 
out of order. For example you limit length of window to 5 and you get the 
following events:

1 2 3 4 6 7 8 5

Do you want to emit windows:

[1 2 3 4 5] (length limit exceeded) + [6 7 8] ?

Or are you fine with interleaving windows in case of out of order:

[1 2 3 4 6] + [5 7 8] 

If the latter one, some custom Trigger should be enough for you. If not, you 
would need to implement hypothetical MergingAndSplitableWindowAssigner, that 
after encountering late event “5” could split previously created windows. 
Unfortunately such feature is not supported by a WindowOperator, so you would 
have to implement your own operator for this.

Regardless of your option remember to write some integration tests:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing>

Piotrek

> On 8 Nov 2017, at 21:43, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> I am implementing a bounded session window but I require to short circuit the 
> session if the session length ( in count of events or time ) go beyond a 
> configured limit , a very reasonable scenario ( bot etc ) . I am using the 
> approach as listed. I am not sure though if the Window itself is being 
> terminated and if that is even feasible. Any other approach or advise ?  
> 
> public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     long maxSessionTime;
> 
>     ValueState<Boolean> doneState;
>     private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
>             new ValueStateDescriptor<>("done", Boolean.class );
> 
>     private BoundedEventTimeTrigger(long maxSessionTime) {
>         this.maxSessionTime = maxSessionTime;
>     }
> 
>     /**
>      * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>      * <p>
>      * <p>Once the trigger fires all elements are discarded. Elements that 
> arrive late immediately
>      * trigger window evaluation with just this one element.
>      */
>     public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>         return new BoundedEventTimeTrigger(maxSessionLengh);
>     }
> 
>     @Override
>     public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
>         if(cleanupState!=null && cleanupState.value()!=null && 
> cleanupState.value()) {
>             return TriggerResult.CONTINUE;
>         }
>         if(timestamp - window.getStart() > maxSessionTime){
>             System.out.println(new Date(timestamp) + "\t" + new 
> Date(window.getStart()));
>             try {
>                 doneState = ctx.getPartitionedState(cleanupStateDescriptor);
>                 doneState.update(true);
>                 return TriggerResult.FIRE_AND_PURGE;
>             } catch (IOException e) {
>                 throw new RuntimeException("Failed to update state", e);
>             }
>         }
> 
>         if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
>             // if the watermark is already past the window fire immediately
>             return TriggerResult.FIRE;
>         } else {
>             ctx.registerEventTimeTimer(window.maxTimestamp());
>             return TriggerResult.CONTINUE;
>         }
>     }
> 
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window, 
> TriggerContext ctx) {
>         return time == window.maxTimestamp() ?
>                 TriggerResult.FIRE :
>                 TriggerResult.CONTINUE;
>     }
> 
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
> 
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws Exception 
> {
>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>     }
> 
>     @Override
>     public boolean canMerge() {
>         return true;
>     }
> 
>     @Override
>     public void onMerge(TimeWindow window,
>                         OnMergeContext ctx) {
>         ctx.registerEventTimeTimer(window.maxTimestamp());
>     }
> 
>     @Override
>     public String toString() {
>         return "EventTimeTrigger()";
>     }
> }
> 

Reply via email to