Re: How to correct use timeWindow() with DataStream?

2018-03-19 Thread Fabian Hueske
If you don't want to partition by key, i.e., have a single result for each
time window, you should not use keyBy and an allWindow.
However, this will only be executed with a parallelism of 1.

2018-03-19 13:54 GMT+01:00 Felipe Gutierrez :

> thanks a lot Fabian,
>
> It clarified my way to developing. I am using keyBy, timeWindow, and apply
> monad operator at the EventTimeStreamExampleJava
> 
> now. I am generating dates in order and with a bit out of orderness now at
> LogSourceFunction
> .
> And only using Date as my key at LogLine
> 
> object.
>
> If I understood watermarks well, my program should combine all the lines
> that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
> is still not happening because I didn't use a good key ".keyBy(lineLog ->
> lineLog.getTime())" and my key at the LogLineCounterFunction class is still
> the Date.
>
> public static class LogLineCounterFunction implements WindowFunction<
> LogLine, // input
> Tuple3, // output
> Date, // key
> TimeWindow> { // window type
>
> What should I use as a key in my case?
>
> My output is combining only the lines with the same key (Date). I want to
> combine the dates between the watermarks ".timeWindow(Time.seconds(5),
> Time.seconds(1))"...
>
> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534},107151667,9)
> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
> | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
> 16:31:04.184},107151667,4)
> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884},107151667,12)
> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
> 16:31:03.784},107151667,1)
> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
> | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
> 16:31:06.334},107151667,4)
>
>
>
>
> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> The timestamps of the stream records should be increasing (strict
>> monotonicity is not required, a bit out of orderness can be handled due to
>> watermarks).
>> So, the events should also be generated with increasing timestamps. It
>> looks like your generator generates random dates. I'd also generate data
>> with millisecond precision, not just days.
>>
>> Also, a timestamp in Flink is the number of milliseconds since
>> 1970-01-01-00:00:00.
>> However, your timestamp extractor only returns the number of seconds
>> since last minute (i.e., from 0 to 60). You should use Date.getTime()
>> instead of Date.getSeconds().
>>
>> Best, Fabian
>>
>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez > >:
>>
>>> Hi all,
>>>
>>> I am building an example with DataStream using Flink that has a fake
>>> source generator of LogLine(Date d, String line). I want to work with
>>> Watermarks on it so I created a class that implements 
>>> AssignerWithPeriodicWatermarks.
>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>>> I can group by second and concatenate the lines. When I use
>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>>> misunderstood something when I was reading about Event Time. Could anyone
>>> help me please? My source code is as follow.
>>>
>>> Thanks for the ideas. Kind Regards,  Felipe
>>>
>>> package flink.example.streaming;
>>>
>>> import flink.util.LogLine;
>>> import flink.util.LogSourceFunction;
>>> import flink.util.UtilDate;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>> vironment;
>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>>> cWatermarks;
>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>> import 

Re: How to correct use timeWindow() with DataStream?

2018-03-19 Thread Felipe Gutierrez
thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply
monad operator at the EventTimeStreamExampleJava

now. I am generating dates in order and with a bit out of orderness now at
LogSourceFunction
.
And only using Date as my key at LogLine

object.

If I understood watermarks well, my program should combine all the lines
that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
is still not happening because I didn't use a good key ".keyBy(lineLog ->
lineLog.getTime())" and my key at the LogLineCounterFunction class is still
the Date.

public static class LogLineCounterFunction implements WindowFunction<
LogLine, // input
Tuple3, // output
Date, // key
TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to
combine the dates between the watermarks ".timeWindow(Time.seconds(5),
Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
| 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534},107151667,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
| 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
16:31:04.184},107151667,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884},107151667,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
16:31:03.784},107151667,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
| 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
16:31:06.334},107151667,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske  wrote:

> Hi,
>
> The timestamps of the stream records should be increasing (strict
> monotonicity is not required, a bit out of orderness can be handled due to
> watermarks).
> So, the events should also be generated with increasing timestamps. It
> looks like your generator generates random dates. I'd also generate data
> with millisecond precision, not just days.
>
> Also, a timestamp in Flink is the number of milliseconds since
> 1970-01-01-00:00:00.
> However, your timestamp extractor only returns the number of seconds since
> last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
> Date.getSeconds().
>
> Best, Fabian
>
> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez 
> :
>
>> Hi all,
>>
>> I am building an example with DataStream using Flink that has a fake
>> source generator of LogLine(Date d, String line). I want to work with
>> Watermarks on it so I created a class that implements 
>> AssignerWithPeriodicWatermarks.
>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>> I can group by second and concatenate the lines. When I use
>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>> misunderstood something when I was reading about Event Time. Could anyone
>> help me please? My source code is as follow.
>>
>> Thanks for the ideas. Kind Regards,  Felipe
>>
>> package flink.example.streaming;
>>
>> import flink.util.LogLine;
>> import flink.util.LogSourceFunction;
>> import flink.util.UtilDate;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>> cWatermarks;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> import javax.annotation.Nullable;
>> import java.util.Date;
>>
>> public class EventTimeStreamExampleJava {
>> public static void main(String[] args) throws Exception {
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> 

Re: How to correct use timeWindow() with DataStream?

2018-03-19 Thread Fabian Hueske
Hi,

The timestamps of the stream records should be increasing (strict
monotonicity is not required, a bit out of orderness can be handled due to
watermarks).
So, the events should also be generated with increasing timestamps. It
looks like your generator generates random dates. I'd also generate data
with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since
1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since
last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez :

> Hi all,
>
> I am building an example with DataStream using Flink that has a fake
> source generator of LogLine(Date d, String line). I want to work with
> Watermarks on it so I created a class that implements
> AssignerWithPeriodicWatermarks. If I don't use the monad
> ".timeWindow(Time.seconds(2))" on the data stream I can group by second and
> concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is
> shown on the output. I guess I misunderstood something when I was reading
> about Event Time. Could anyone help me please? My source code is as follow.
>
> Thanks for the ideas. Kind Regards,  Felipe
>
> package flink.example.streaming;
>
> import flink.util.LogLine;
> import flink.util.LogSourceFunction;
> import flink.util.UtilDate;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.
> AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.streaming.api.windowing.time.Time;
>
> import javax.annotation.Nullable;
> import java.util.Date;
>
> public class EventTimeStreamExampleJava {
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream dataStream = env
> .addSource(new LogSourceFunction())
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator())
> .keyBy(lineLog -> lineLog.getSec())
> // .timeWindow(Time.seconds(2))
> .reduce((log1, log2) -> new LogLine(log1.getTime(),
> log1.getLine() + " | " + log2.getLine()))
> ;
>
> dataStream.print();
>
> env.execute("Window LogRead");
> }
>
> public static class BoundedOutOfOrdernessGenerator implements
> AssignerWithPeriodicWatermarks {
>
> private final long maxOutOfOrderness = 3500; // 3.5 seconds
>
> private long currentMaxTimestamp;
>
> @Override
> public long extractTimestamp(LogLine element, long
> previousElementTimestamp) {
> long timestamp = element.getTime().getSeconds();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> return timestamp;
> }
>
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> // return the watermark as current highest timestamp minus the
> out-of-orderness bound
> return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> }
> }
> }
>
> package flink.util;
>
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> public class LogSourceFunction implements SourceFunction {
>
> private volatile boolean isRunning = true;
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> while (isRunning) {
> ctx.collect(new LogLine(UtilDate.getRandomSec(),
> UtilDate.getRandomString()));
> }
> }
>
> @Override
> public void cancel() {
> isRunning = false;
> }
> }
>
> package flink.util;
>
> import java.util.Date;
> import java.util.Objects;
>
> public class LogLine {
>
> private Date time;
> private int sec;
> private String line;
>
> public LogLine() {
> }
>
> public LogLine(Date time, String line) {
> this.sec = time.getSeconds();
> this.time = time;
> this.line = line;
> }
>
> public LogLine(int sec, String line) {
> this.sec = sec;
> this.time = UtilDate.getRandomDate(sec);
> this.line = line;
> }
>
> public int getSec() {
> return sec;
> }
>
> public void setSec(int sec) {
> this.sec = sec;
> }
>
> public Date getTime() {
> return time;
> }
>
> public String getLine() {
> return line;
> }
>
> public void setTime(Date time) {
> this.time = time;
> }