Hi,

The timestamps of the stream records should be increasing (strict
monotonicity is not required, a bit out of orderness can be handled due to
watermarks).
So, the events should also be generated with increasing timestamps. It
looks like your generator generates random dates. I'd also generate data
with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since
1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since
last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>:

> Hi all,
>
> I am building an example with DataStream using Flink that has a fake
> source generator of LogLine(Date d, String line). I want to work with
> Watermarks on it so I created a class that implements
> AssignerWithPeriodicWatermarks. If I don't use the monad
> ".timeWindow(Time.seconds(2))" on the data stream I can group by second and
> concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is
> shown on the output. I guess I misunderstood something when I was reading
> about Event Time. Could anyone help me please? My source code is as follow.
>
> Thanks for the ideas. Kind Regards,  Felipe
>
> package flink.example.streaming;
>
> import flink.util.LogLine;
> import flink.util.LogSourceFunction;
> import flink.util.UtilDate;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.
> AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.streaming.api.windowing.time.Time;
>
> import javax.annotation.Nullable;
> import java.util.Date;
>
> public class EventTimeStreamExampleJava {
>     public static void main(String[] args) throws Exception {
>
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         DataStream<LogLine> dataStream = env
>                 .addSource(new LogSourceFunction())
>                 .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator())
>                 .keyBy(lineLog -> lineLog.getSec())
>                 // .timeWindow(Time.seconds(2))
>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
> log1.getLine() + " | " + log2.getLine()))
>                 ;
>
>         dataStream.print();
>
>         env.execute("Window LogRead");
>     }
>
>     public static class BoundedOutOfOrdernessGenerator implements
> AssignerWithPeriodicWatermarks<LogLine> {
>
>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>
>         private long currentMaxTimestamp;
>
>         @Override
>         public long extractTimestamp(LogLine element, long
> previousElementTimestamp) {
>             long timestamp = element.getTime().getSeconds();
>             currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>             return timestamp;
>         }
>
>         @Nullable
>         @Override
>         public Watermark getCurrentWatermark() {
>             // return the watermark as current highest timestamp minus the
> out-of-orderness bound
>             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>         }
>     }
> }
>
> package flink.util;
>
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> public class LogSourceFunction implements SourceFunction<LogLine> {
>
>     private volatile boolean isRunning = true;
>
>     @Override
>     public void run(SourceContext<LogLine> ctx) throws Exception {
>         while (isRunning) {
>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
> UtilDate.getRandomString()));
>         }
>     }
>
>     @Override
>     public void cancel() {
>         isRunning = false;
>     }
> }
>
> package flink.util;
>
> import java.util.Date;
> import java.util.Objects;
>
> public class LogLine {
>
>     private Date time;
>     private int sec;
>     private String line;
>
>     public LogLine() {
>     }
>
>     public LogLine(Date time, String line) {
>         this.sec = time.getSeconds();
>         this.time = time;
>         this.line = line;
>     }
>
>     public LogLine(int sec, String line) {
>         this.sec = sec;
>         this.time = UtilDate.getRandomDate(sec);
>         this.line = line;
>     }
>
>     public int getSec() {
>         return sec;
>     }
>
>     public void setSec(int sec) {
>         this.sec = sec;
>     }
>
>     public Date getTime() {
>         return time;
>     }
>
>     public String getLine() {
>         return line;
>     }
>
>     public void setTime(Date time) {
>         this.time = time;
>     }
>
>     public void setLine(String line) {
>         this.line = line;
>     }
>
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>         LogLine logLine = (LogLine) o;
>         return Objects.equals(time, logLine.time) &&
>                 Objects.equals(sec, logLine.sec) &&
>                 Objects.equals(line, logLine.line);
>     }
>
>     @Override
>     public int hashCode() {
>
>         return Objects.hash(time, sec, line);
>     }
>
>     @Override
>     public String toString() {
>         return "LogLine{" +
>                 "time=" + time +
>                 ", sec=" + sec +
>                 ", line='" + line +
>                 '}';
>     }
> }
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>

Reply via email to