Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing
and collected aggregated stats over time window intervals of 5 secs, 5 mins
etc. Is this the right way to create the TimeWaterMarkAssigner with a bound
? I want to collect the stats for each eventTimestamp + window intervals.
My understanding - *the generated watermark which is eventTimestamp + bound
will collect all the eventTimestamp's which arrive within that Watermark
inside each eventTimestamp + 5s etc window interval. Or does this bound
have to be based on the windowInterval i.e extractedTimestamp +
windowInterval + bound *??


> *public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks<Monitoring> {*
> * private long bound = 5 * (long) 1000; *
> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
> *        return monitoring.getEventTimestamp();**    }*
>
> *    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {*
> *        return new Watermark(extractedTimestamp + bound);//<==== should
> it be - bound ?*
> *    }**}*


Used here:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final DataStreamSource<Monitoring> monitoringDataStreamSource =
> env.addSource(....);
> DataStream<Monitoring> kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> MonitoringTSWAssigner());
> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy("deployment", .....);
> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>                 monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5
> sec time window


TIA,

Reply via email to