Hi Rohan, I am not sure if I fully understand your problem. For example, if you receive an event with a start time of 4:50 and an end time of 5:30, do you want the "usage" from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the event had an end time of 5:31? Do you then want to ignore the event for the 4:00 - 5:00 window?
Best, Gary On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <rohan.thimma...@gmail.com> wrote: > Hi Gary, > > This is perfect. I am able to get the window working on message timestamp > then clock window also stream the data that are late. > > I also having one edge case. > > > for eg i get my last report at 4.57 and i never get 5.00+ hour report > *ever*. i would like to wait for sometime. My reporting interval size is > 30 min. if in next 30 min if i don't see any record then i would like to > construct 4-5 by closing the window and dispatch the report. Intention is i > don't want to loss the last hour of the data since the stream end in > between the hour. > > Rohan > > On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <g...@data-artisans.com> wrote: > >> Hi Rohan, >> >> Your ReportTimestampExtractor assigns timestamps to the stream records >> correctly >> but uses the wall clock to emit Watermarks (System.currentTimeMillis). In >> Flink >> Watermarks are the mechanism to advance the event time. Hence, you should >> emit >> Watermarks according to the time that you extract from your events. >> >> You can take a look at the already existing timestamp extractors / >> watermark >> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see >> how it can >> be done. >> >> Best, >> Gary >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >> dev/event_timestamp_extractors.html >> >> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa < >> rohan.thimma...@gmail.com> wrote: >> >>> Hi All, >>> >>> >>> I have following requirement >>> >>> 1. i have avro json message containing {eventid, usage, starttime, >>> endtime} >>> 2. i am reading this from kafka source >>> >>> 3. if there is overlapping hour in a record split the record by rounding >>> off to hourly bounderies >>> 4.My objective is a) read the message b) aggregate the usage between the >>> hour >>> 5. send the aggregated data to another kafka topic. >>> >>> i don't want aggregate based on clock window. if i see next hour in >>> endtime then i would like to close the window and aggregated usage to be >>> send down to kafka sink topic. >>> >>> >>> eg: >>> input data >>> 4.55 - 5.00 >>> 5.00 -5.25 >>> 5.25- 5.55. >>> 5.55-625 >>> >>> after split >>> 4.55- 5.00 - expect record to be going out with this >>> 5.00 -5.25 >>> 5.25- 5.55. >>> 5.55-6.00 - expect record to be going out with this >>> 5.00-625 >>> >>> >>> >>> >>> 1. i have set the eventime : >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> >>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] >>> = stream >>> .flatMap(new SplitFlatMap() // checks if the overlapping hour if yes >>> then create split recordr with hourly boundarry >>> .assignTimestampsAndWatermarks(new ReportTimestampExtractor) >>> .keyBy(0) >>> >>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong))) >>> >>> .reduce(new Counter()) //aggrigates the usage collected within window >>> >>> 3. here is the implementation for timestampeextractor >>> >>> class ReportTimestampExtractor extends >>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable >>> { >>> override def extractTimestamp(e: Tuple2[String, Report], >>> prevElementTimestamp: Long) = { >>> e.f1.getEndTime >>> } >>> >>> override def getCurrentWatermark(): Watermark = { >>> new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 >>> hour >>> } >>> } >>> >>> >>> I see the aggregation is generated only the clock window rather than when >>> the window sees next hour in the record. >>> >>> >>> >>> Is there anything i am missing. by definition eventtime if i set it should >>> respect message time rather than clock window >>> >>> >>> >>> >>> -- >>> Thanks >>> Rohan >>> >> >> > > > -- > Thanks > Rohan >