Hi all,I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the first time and every 20 seconds after that.I believe I cannot get this logic right in the implementation of my custom Trigger. Please help me with this. Here is the code of my custom Trigger:
public class TradeTrigger<W extends Window> extends Trigger<Object, W> { /** * */ private static final long serialVersionUID = 1L; private TradeTrigger() { } @Override public TriggerResult onElement( Object element, long timestamp, W window, org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(timestamp); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime( long timestamp, W window, org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception { ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false)); if(state.value()==false){ ctx.registerEventTimeTimer(timestamp+60000); state.update(true); return TriggerResult.FIRE; }else{ System.out.println(""+state.value()); ctx.registerEventTimeTimer(timestamp+20000); return TriggerResult.FIRE; } } @Override public TriggerResult onProcessingTime( long arg0, W arg1, org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2) throws Exception { // TODO Auto-generated method stub return TriggerResult.CONTINUE; } public static <W extends Window> TradeTrigger<W> of() { return new TradeTrigger<>(); } } Thanks and Regards,Piyush Shrivastava http://webograffiti.com