Hi Stuck with the simple program regarding the checkpointing Flink version I am using 1.10.0
*Here I have created DummySource for testing* *DummySource* package com.nudge.stateful; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{ /** * */ private static final long serialVersionUID = 1L; private Boolean isRunning=true; public BeaconSource() { super(); // TODO Auto-generated constructor stub } public void cancel() { // TODO Auto-generated method stub this.isRunning=false; } public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception { // TODO Auto-generated method stub while(isRunning) { Thread.sleep(30000L); arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource")); } } } --------------------------------------------------------------------------------------- *KeyedProcessFunction (to register the timer and update the status to true so that only one-time trigger should)* package com.nudge.stateful; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import scala.collection.mutable.LinkedHashMap; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; public class TimeProcessTrigger extends KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{ /** * */ private static final long serialVersionUID = 1L; /** * */ private transient ValueState<Boolean> contacthistory; private static final Long ONE_MINUTE=60000L; @Override public void onTimer(long timestamp, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.OnTimerContext ctx, Collector<String> out) throws Exception { // TODO Auto-generated method stub super.onTimer(timestamp, ctx, out); System.out.println("Timer has fired for the key"+ctx.getCurrentKey()); } @Override public void open(Configuration parameters) throws Exception { // TODO Auto-generated method stub super.open(parameters); ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<Boolean>( "contact-history", // the state name Boolean.class); // type information this.contacthistory=getRuntimeContext().getState(descriptor); } @Override public void processElement(Tuple2<Long, String> input, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, Collector<String> collect) throws Exception { // TODO Auto-generated method stub System.out.println(this.contacthistory.value()); Boolean value = this.contacthistory.value(); if(value==null) { Long currentTime = ctx.timerService().currentProcessingTime(); Long regTimer=currentTime+ONE_MINUTE; System.out.println("Updating the flag and registering the timer @:"+regTimer); this.contacthistory.update(true); ctx.timerService().registerProcessingTimeTimer(regTimer); }else { System.out.println("Timer has already register for this key"); } } } ------------------------------------------------- *Main App* package com.nudge.stateful; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.indiabulls.nudge.stateful.*; public class App { public static void main( String[] args ) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(30000); env.setParallelism(1); // // advanced options: // // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); // // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // // allow job recovery fallback to checkpoint when there is a more recent savepoint env.getCheckpointConfig().setPreferCheckpointForRecovery(true); SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = env.addSource(new BeaconSource()) .name("AMQSource"); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setParallelism(1); KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0); SingleOutputStreamOperator<String> processedStream = keyedValues.process(new TimeProcessTrigger()).setParallelism(1); processedStream.print(); env.execute(); } } -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*