Hi, you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks. The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured. Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).
Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L); Best, Fabian 2017-12-14 16:30 GMT+01:00 Plamen Paskov <plamen.pas...@next-stream.com>: > Hi, > > I'm trying to run the following streaming program in my local flink 1.3.2 > environment. The program compile and run without any errors but the print() > call doesn't display anything. Once i stop the program i receive all > aggregated data. Any ideas how to make it output regularly or when new data > come/old data updated? > > package flink; > import org.apache.flink.api.common.functions.MapFunction;import > org.apache.flink.api.java.tuple.Tuple2;import > org.apache.flink.streaming.api.TimeCharacteristic;import > org.apache.flink.streaming.api.datastream.DataStream;import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import > org.apache.flink.streaming.api.windowing.time.Time;import > org.apache.flink.table.api.Table;import > org.apache.flink.table.api.java.Slide;import > org.apache.flink.table.api.java.StreamTableEnvironment; > import java.sql.Timestamp; > > public class StreamingJob { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > StreamTableEnvironment tEnv = > StreamTableEnvironment.getTableEnvironment(env); > > > SingleOutputStreamOperator<WC> input = env > .socketTextStream("localhost", 9000, "\n") > .map(new MapFunction<String, WC>() { > @Override public WC map(String value) > throws Exception { > String[] row = value.split(","); > Timestamp timestamp = Timestamp.valueOf(row[2]); > return new WC(row[0], Long.valueOf(row[1]), > timestamp); > } > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) { > @Override public long > extractTimestamp(WC element) { > return element.dt.getTime(); > } > }); > > > tEnv.registerDataStream("WordCount", input, "word, frequency, > dt.rowtime"); > > Table table = tEnv.scan("WordCount") > > .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w")) > .groupBy("w, word") > .select("word, frequency.sum as frequency, w.start as dt"); > DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, > WC.class); > result.print(); > > env.execute(); > } > > public static class WC { > public String word; > public long frequency; > public Timestamp dt; > > public WC() { > } > > public WC(String word, long frequency, Timestamp dt) { > this.word = word; > this.frequency = frequency; > this.dt = dt; > } > > @Override public String toString() { > return "WC " + word + " " + frequency + " " + dt.getTime(); > } > } > } > > > Sample input: > > hello,1,2017-12-14 13:10:01 > ciao,1,2017-12-14 13:10:02 > hello,1,2017-12-14 13:10:03 > hello,1,2017-12-14 13:10:04 > > > Thanks >