Hi , I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.
var kafkaConsumer = new FlinkKafkaConsumer08( params.getRequired("input-topic"), new SimpleStringSchema, params.getProperties) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern)) var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] { var ts = Long.MinValue override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { var timestamp = json_decode(element).toLong ts = Math.max(timestamp,previousElementTimestamp) timestamp } override def getCurrentWatermark(): Watermark = { new Watermark(ts) } }) var output = mts .keyBy(t=>json_decode(t)) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .allowedLateness(Time.seconds(5)) .reduce((v1,v2)=>v1+"----"+v2) output.writeAsText(path).setParallelism(1) I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening. Any help? Rahul Raj