Hi, I'm using *Flink* *1.2.0* to read from *Kafka*-0.8.1.1_2.10.
I have written a *flink* streaming job that creates (event) time based window and then computes some stats. However, the window function is never called. I used the debug watermark code and noticed that no watermark is generated. If I read from file, then only one watermark is generated. Here is my code (reading from *kafka*)- public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args); // Read from kafka (it works as the following print statement works) DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env); // jsonEventStream.print(); jsonEventStream .flatMap(new strToTupleFlatMapFunImpl()) .assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner()) .flatMap(new jsonToTupleListFlatMapFunImpl()) .keyBy(0, 1, 2) .timeWindow(Time.seconds(60)) .allowedLateness(Time.seconds(10)) .reduce(new ReduceFunImpl(), new WindowFunImpl()) // reduce fun gets called but not window fun .addSink(new InfluxDBSink(INFLUXDB_DB)); env.execute(); } private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>> getRawJsonTimestampsAndWatermarksAssigner() { return new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(WINDOW_LATENESS)) { @Override public long extractTimestamp(Tuple2<String, Long> tuple) { return tuple.f1; } }; } public static StreamExecutionEnvironment createExecutionEnvironment(String[] args) throws IOException { ParameterTool params = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); //env.getConfig().setAutoWatermarkInterval(1000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); return env; } Any help will be appreciated. Thank you, Sam