Hi

Stuck with the simple program regarding the checkpointing Flink version I
am using 1.10.0

*Here I have created DummySource for testing*

*DummySource*
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(30000L);
arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
}
}

}


---------------------------------------------------------------------------------------
*KeyedProcessFunction (to register the timer and update the status to true
so that only one-time trigger should)*


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends
KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState<Boolean> contacthistory;
private static final  Long  ONE_MINUTE=60000L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
Tuple2<Long, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor<Boolean> descriptor = new
ValueStateDescriptor<Boolean>(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2<Long, String> input,
KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
Collector<String> collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer
@:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-------------------------------------------------
*Main App*

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(1);
// // advanced options:
// // set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// // make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// // checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// // allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// // enable externalized checkpoints which are retained after job
cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // allow job recovery fallback to checkpoint when there is a more recent
savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
env.addSource(new BeaconSource())
.name("AMQSource");
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
SingleOutputStreamOperator<String> processedStream =
keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
processedStream.print();
env.execute();
}
}
-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

*e-mail :puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

Reply via email to