Hi,

Let me also add that you should also override the clear() method in order to 
clear you state.
and delete the pending timers.

Kostas

> On Apr 25, 2016, at 11:52 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Piyush,
> 
> In the onElement function, you register a timer every time you receive an 
> element. 
> 
> When the next watermark arrives, in the flag==false case, this will lead to 
> every element 
> adding a timer for its timestamp+60000ms. The same for flag==true case, with 
> 20000ms interval.
> 
> What you can try is to set only once, at the first element the initial 
> trigger for 60 sec, and then 
> just set all the rest in the on the onEventTime with 20 sec.
> 
> To have a look at a custom trigger you can look at here:
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>  
> <https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>
> 
> I hope this helped.
> Let me know if you need any help.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <piyush...@yahoo.co.in 
>> <mailto:piyush...@yahoo.co.in>> wrote:
>> 
>> Hi all,
>> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute 
>> for the first time and every 20 seconds after that.
>> I believe I cannot get this logic right in the implementation of my custom 
>> Trigger. Please help me with this.
>> 
>> Here is the code of my custom Trigger:
>> 
>> public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
>> 
>>     /**
>>      * 
>>      */
>>     private static final long serialVersionUID = 1L;
>>       
>>     private TradeTrigger() {
>>     }
>>     
>>     @Override
>>     public TriggerResult onElement(
>>             Object element,
>>             long timestamp,
>>             W window,
>>             
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>>             throws Exception {
>>         
>>         ctx.registerEventTimeTimer(timestamp);
>>         return TriggerResult.CONTINUE;
>>         
>>     }
>> 
>>     @Override
>>     public TriggerResult onEventTime(
>>             long timestamp,
>>             W window,
>>             
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>>             throws Exception {
>>     
>>     ValueState<Boolean> state = ctx.getPartitionedState(new 
>> ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false));
>>         
>>         if(state.value()==false){
>>             ctx.registerEventTimeTimer(timestamp+60000);
>>             state.update(true);
>>             return TriggerResult.FIRE;
>>         }else{
>>             System.out.println(""+state.value());
>>             ctx.registerEventTimeTimer(timestamp+20000);
>>             return TriggerResult.FIRE;
>>         }
>>     }
>> 
>>     @Override
>>     public TriggerResult onProcessingTime(
>>             long arg0,
>>             W arg1,
>>             
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 
>> arg2)
>>             throws Exception {
>>         // TODO Auto-generated method stub
>>         return TriggerResult.CONTINUE;
>>     }
>>     
>>      
>>     public static <W extends Window> TradeTrigger<W> of() {
>>         return new TradeTrigger<>();
>>     }
>> 
>>     
>> }
>>  
>> Thanks and Regards,
>> Piyush Shrivastava <mailto:piy...@webograffiti.com>
>> 
>> http://webograffiti.com <http://webograffiti.com/>
> 

Reply via email to