Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate
watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
assigner and only generates watermarks if a watermark interval is
configured.
Without watermarks, the query cannot "make progress" and only computes its
result when the program is closed (sources emit a MAX_LONG watermark when
being canceled).

Long story short: you need to configure the watermark interval:
env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <plamen.pas...@next-stream.com>:

> Hi,
>
> I'm trying to run the following streaming program in my local flink 1.3.2
> environment. The program compile and run without any errors but the print()
> call doesn't display anything. Once i stop the program i receive all
> aggregated data. Any ideas how to make it output regularly or when new data
> come/old data updated?
>
> package flink;
> import org.apache.flink.api.common.functions.MapFunction;import 
> org.apache.flink.api.java.tuple.Tuple2;import 
> org.apache.flink.streaming.api.TimeCharacteristic;import 
> org.apache.flink.streaming.api.datastream.DataStream;import 
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import
>  org.apache.flink.streaming.api.windowing.time.Time;import 
> org.apache.flink.table.api.Table;import 
> org.apache.flink.table.api.java.Slide;import 
> org.apache.flink.table.api.java.StreamTableEnvironment;
> import java.sql.Timestamp;
>
> public class StreamingJob {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         StreamTableEnvironment tEnv = 
> StreamTableEnvironment.getTableEnvironment(env);
>
>
>         SingleOutputStreamOperator<WC> input = env
>                 .socketTextStream("localhost", 9000, "\n")
>                 .map(new MapFunction<String, WC>() {
>                     @Override                    public WC map(String value) 
> throws Exception {
>                         String[] row = value.split(",");
>                         Timestamp timestamp = Timestamp.valueOf(row[2]);
>                         return new WC(row[0], Long.valueOf(row[1]), 
> timestamp);
>                     }
>                 })
>                 .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
>                     @Override                    public long 
> extractTimestamp(WC element) {
>                         return element.dt.getTime();
>                     }
>                 });
>
>
>         tEnv.registerDataStream("WordCount", input, "word, frequency, 
> dt.rowtime");
>
>         Table table = tEnv.scan("WordCount")
>                 
> .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
>                 .groupBy("w, word")
>                 .select("word, frequency.sum as frequency, w.start as dt");   
>      DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, 
> WC.class);
>         result.print();
>
>         env.execute();
>     }
>
>     public static class WC {
>         public String word;
>         public long frequency;
>         public Timestamp dt;
>
>         public WC() {
>         }
>
>         public WC(String word, long frequency, Timestamp dt) {
>             this.word = word;
>             this.frequency = frequency;
>             this.dt = dt;
>         }
>
>         @Override        public String toString() {
>             return "WC " + word + " " + frequency + " " + dt.getTime();
>         }
>     }
> }
>
>
> Sample input:
>
> hello,1,2017-12-14 13:10:01
> ciao,1,2017-12-14 13:10:02
> hello,1,2017-12-14 13:10:03
> hello,1,2017-12-14 13:10:04
>
>
> Thanks
>

Reply via email to