thanks a lot Fabian, It clarified my way to developing. I am using keyBy, timeWindow, and apply monad operator at the EventTimeStreamExampleJava <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java> now. I am generating dates in order and with a bit out of orderness now at LogSourceFunction <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>. And only using Date as my key at LogLine <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java> object.
If I understood watermarks well, my program should combine all the lines that are inside the same watermark when I set ".timeWindow(Time.seconds(5), Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it is still not happening because I didn't use a good key ".keyBy(lineLog -> lineLog.getTime())" and my key at the LogLineCounterFunction class is still the Date. public static class LogLineCounterFunction implements WindowFunction< LogLine, // input Tuple3<LogLine, Long, Integer>, // output Date, // key TimeWindow> { // window type What should I use as a key in my case? My output is combining only the lines with the same key (Date). I want to combine the dates between the watermarks ".timeWindow(Time.seconds(5), Time.seconds(1))"... 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534},1071516670000,9) 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184},1071516670000,4) 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884},1071516670000,12) 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 16:31:03.784},1071516670000,1) 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334},1071516670000,4) On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > The timestamps of the stream records should be increasing (strict > monotonicity is not required, a bit out of orderness can be handled due to > watermarks). > So, the events should also be generated with increasing timestamps. It > looks like your generator generates random dates. I'd also generate data > with millisecond precision, not just days. > > Also, a timestamp in Flink is the number of milliseconds since > 1970-01-01-00:00:00. > However, your timestamp extractor only returns the number of seconds since > last minute (i.e., from 0 to 60). You should use Date.getTime() instead of > Date.getSeconds(). > > Best, Fabian > > 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com> > : > >> Hi all, >> >> I am building an example with DataStream using Flink that has a fake >> source generator of LogLine(Date d, String line). I want to work with >> Watermarks on it so I created a class that implements >> AssignerWithPeriodicWatermarks. >> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream >> I can group by second and concatenate the lines. When I use >> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I >> misunderstood something when I was reading about Event Time. Could anyone >> help me please? My source code is as follow. >> >> Thanks for the ideas. Kind Regards, Felipe >> >> package flink.example.streaming; >> >> import flink.util.LogLine; >> import flink.util.LogSourceFunction; >> import flink.util.UtilDate; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEn >> vironment; >> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi >> cWatermarks; >> import org.apache.flink.streaming.api.watermark.Watermark; >> import org.apache.flink.streaming.api.windowing.time.Time; >> >> import javax.annotation.Nullable; >> import java.util.Date; >> >> public class EventTimeStreamExampleJava { >> public static void main(String[] args) throws Exception { >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> DataStream<LogLine> dataStream = env >> .addSource(new LogSourceFunction()) >> .assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessGenerator()) >> .keyBy(lineLog -> lineLog.getSec()) >> // .timeWindow(Time.seconds(2)) >> .reduce((log1, log2) -> new LogLine(log1.getTime(), >> log1.getLine() + " | " + log2.getLine())) >> ; >> >> dataStream.print(); >> >> env.execute("Window LogRead"); >> } >> >> public static class BoundedOutOfOrdernessGenerator implements >> AssignerWithPeriodicWatermarks<LogLine> { >> >> private final long maxOutOfOrderness = 3500; // 3.5 seconds >> >> private long currentMaxTimestamp; >> >> @Override >> public long extractTimestamp(LogLine element, long >> previousElementTimestamp) { >> long timestamp = element.getTime().getSeconds(); >> currentMaxTimestamp = Math.max(timestamp, >> currentMaxTimestamp); >> return timestamp; >> } >> >> @Nullable >> @Override >> public Watermark getCurrentWatermark() { >> // return the watermark as current highest timestamp minus >> the out-of-orderness bound >> return new Watermark(currentMaxTimestamp - maxOutOfOrderness); >> } >> } >> } >> >> package flink.util; >> >> import org.apache.flink.streaming.api.functions.source.SourceFunction; >> >> public class LogSourceFunction implements SourceFunction<LogLine> { >> >> private volatile boolean isRunning = true; >> >> @Override >> public void run(SourceContext<LogLine> ctx) throws Exception { >> while (isRunning) { >> ctx.collect(new LogLine(UtilDate.getRandomSec(), >> UtilDate.getRandomString())); >> } >> } >> >> @Override >> public void cancel() { >> isRunning = false; >> } >> } >> >> package flink.util; >> >> import java.util.Date; >> import java.util.Objects; >> >> public class LogLine { >> >> private Date time; >> private int sec; >> private String line; >> >> public LogLine() { >> } >> >> public LogLine(Date time, String line) { >> this.sec = time.getSeconds(); >> this.time = time; >> this.line = line; >> } >> >> public LogLine(int sec, String line) { >> this.sec = sec; >> this.time = UtilDate.getRandomDate(sec); >> this.line = line; >> } >> >> public int getSec() { >> return sec; >> } >> >> public void setSec(int sec) { >> this.sec = sec; >> } >> >> public Date getTime() { >> return time; >> } >> >> public String getLine() { >> return line; >> } >> >> public void setTime(Date time) { >> this.time = time; >> } >> >> public void setLine(String line) { >> this.line = line; >> } >> >> @Override >> public boolean equals(Object o) { >> if (this == o) return true; >> if (o == null || getClass() != o.getClass()) return false; >> LogLine logLine = (LogLine) o; >> return Objects.equals(time, logLine.time) && >> Objects.equals(sec, logLine.sec) && >> Objects.equals(line, logLine.line); >> } >> >> @Override >> public int hashCode() { >> >> return Objects.hash(time, sec, line); >> } >> >> @Override >> public String toString() { >> return "LogLine{" + >> "time=" + time + >> ", sec=" + sec + >> ", line='" + line + >> '}'; >> } >> } >> >> >> -- >> >> *---- Felipe Oliveira Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> > > -- *---- Felipe Oliveira Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*