Hi,

I'm using *Flink* *1.2.0* to read from *Kafka*-0.8.1.1_2.10.

I have written a *flink* streaming job that creates (event) time based
window and then computes some stats. However, the window function is never
called. I used the debug watermark code and noticed that no watermark is
generated. If I read from file, then only one watermark is generated. Here
is my code (reading from *kafka*)-

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);

    // Read from kafka (it works as the following print statement works)
    DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
    // jsonEventStream.print();

    jsonEventStream
        .flatMap(new strToTupleFlatMapFunImpl())
        
.assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner())
        .flatMap(new jsonToTupleListFlatMapFunImpl())
        .keyBy(0, 1, 2)
        .timeWindow(Time.seconds(60))
        .allowedLateness(Time.seconds(10))
        .reduce(new ReduceFunImpl(), new WindowFunImpl())  // reduce
fun gets called but not window fun
        .addSink(new InfluxDBSink(INFLUXDB_DB));

    env.execute();
}

private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>> getRawJsonTimestampsAndWatermarksAssigner() {
    return new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>>(Time.seconds(WINDOW_LATENESS)) {
        @Override
        public long extractTimestamp(Tuple2<String, Long> tuple) {
            return tuple.f1;
        }
    };
}


public static StreamExecutionEnvironment
createExecutionEnvironment(String[] args) throws IOException {
    ParameterTool params = ParameterTool.fromArgs(args);
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setGlobalJobParameters(params);
    //env.getConfig().setAutoWatermarkInterval(1000);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    return env;
}


Any help will be appreciated.

Thank you,

Sam

Reply via email to