thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply
monad operator at the EventTimeStreamExampleJava
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
now. I am generating dates in order and with a bit out of orderness now at
LogSourceFunction
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>.
And only using Date as my key at LogLine
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java>
object.

If I understood watermarks well, my program should combine all the lines
that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
is still not happening because I didn't use a good key ".keyBy(lineLog ->
lineLog.getTime())" and my key at the LogLineCounterFunction class is still
the Date.

public static class LogLineCounterFunction implements WindowFunction<
        LogLine, // input
        Tuple3<LogLine, Long, Integer>, // output
        Date, // key
        TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to
combine the dates between the watermarks ".timeWindow(Time.seconds(5),
Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
| 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534},1071516670000,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
| 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
16:31:04.184},1071516670000,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884},1071516670000,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
16:31:03.784},1071516670000,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
| 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
16:31:06.334},1071516670000,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> The timestamps of the stream records should be increasing (strict
> monotonicity is not required, a bit out of orderness can be handled due to
> watermarks).
> So, the events should also be generated with increasing timestamps. It
> looks like your generator generates random dates. I'd also generate data
> with millisecond precision, not just days.
>
> Also, a timestamp in Flink is the number of milliseconds since
> 1970-01-01-00:00:00.
> However, your timestamp extractor only returns the number of seconds since
> last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
> Date.getSeconds().
>
> Best, Fabian
>
> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> :
>
>> Hi all,
>>
>> I am building an example with DataStream using Flink that has a fake
>> source generator of LogLine(Date d, String line). I want to work with
>> Watermarks on it so I created a class that implements 
>> AssignerWithPeriodicWatermarks.
>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>> I can group by second and concatenate the lines. When I use
>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>> misunderstood something when I was reading about Event Time. Could anyone
>> help me please? My source code is as follow.
>>
>> Thanks for the ideas. Kind Regards,  Felipe
>>
>> package flink.example.streaming;
>>
>> import flink.util.LogLine;
>> import flink.util.LogSourceFunction;
>> import flink.util.UtilDate;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>> cWatermarks;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> import javax.annotation.Nullable;
>> import java.util.Date;
>>
>> public class EventTimeStreamExampleJava {
>>     public static void main(String[] args) throws Exception {
>>
>>         final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>         DataStream<LogLine> dataStream = env
>>                 .addSource(new LogSourceFunction())
>>                 .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessGenerator())
>>                 .keyBy(lineLog -> lineLog.getSec())
>>                 // .timeWindow(Time.seconds(2))
>>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
>> log1.getLine() + " | " + log2.getLine()))
>>                 ;
>>
>>         dataStream.print();
>>
>>         env.execute("Window LogRead");
>>     }
>>
>>     public static class BoundedOutOfOrdernessGenerator implements
>> AssignerWithPeriodicWatermarks<LogLine> {
>>
>>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>>
>>         private long currentMaxTimestamp;
>>
>>         @Override
>>         public long extractTimestamp(LogLine element, long
>> previousElementTimestamp) {
>>             long timestamp = element.getTime().getSeconds();
>>             currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>>             return timestamp;
>>         }
>>
>>         @Nullable
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>             // return the watermark as current highest timestamp minus
>> the out-of-orderness bound
>>             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>>         }
>>     }
>> }
>>
>> package flink.util;
>>
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>
>> public class LogSourceFunction implements SourceFunction<LogLine> {
>>
>>     private volatile boolean isRunning = true;
>>
>>     @Override
>>     public void run(SourceContext<LogLine> ctx) throws Exception {
>>         while (isRunning) {
>>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
>> UtilDate.getRandomString()));
>>         }
>>     }
>>
>>     @Override
>>     public void cancel() {
>>         isRunning = false;
>>     }
>> }
>>
>> package flink.util;
>>
>> import java.util.Date;
>> import java.util.Objects;
>>
>> public class LogLine {
>>
>>     private Date time;
>>     private int sec;
>>     private String line;
>>
>>     public LogLine() {
>>     }
>>
>>     public LogLine(Date time, String line) {
>>         this.sec = time.getSeconds();
>>         this.time = time;
>>         this.line = line;
>>     }
>>
>>     public LogLine(int sec, String line) {
>>         this.sec = sec;
>>         this.time = UtilDate.getRandomDate(sec);
>>         this.line = line;
>>     }
>>
>>     public int getSec() {
>>         return sec;
>>     }
>>
>>     public void setSec(int sec) {
>>         this.sec = sec;
>>     }
>>
>>     public Date getTime() {
>>         return time;
>>     }
>>
>>     public String getLine() {
>>         return line;
>>     }
>>
>>     public void setTime(Date time) {
>>         this.time = time;
>>     }
>>
>>     public void setLine(String line) {
>>         this.line = line;
>>     }
>>
>>     @Override
>>     public boolean equals(Object o) {
>>         if (this == o) return true;
>>         if (o == null || getClass() != o.getClass()) return false;
>>         LogLine logLine = (LogLine) o;
>>         return Objects.equals(time, logLine.time) &&
>>                 Objects.equals(sec, logLine.sec) &&
>>                 Objects.equals(line, logLine.line);
>>     }
>>
>>     @Override
>>     public int hashCode() {
>>
>>         return Objects.hash(time, sec, line);
>>     }
>>
>>     @Override
>>     public String toString() {
>>         return "LogLine{" +
>>                 "time=" + time +
>>                 ", sec=" + sec +
>>                 ", line='" + line +
>>                 '}';
>>     }
>> }
>>
>>
>> --
>>
>> *---- Felipe Oliveira Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>
>


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to