Hi Rohan,

I am not sure if I fully understand your problem. For example, if you
receive an
event with a start time of 4:50 and an end time of 5:30, do you want the
"usage"
from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
event had
an end time of 5:31? Do you then want to ignore the event for the 4:00 -
5:00
window?

Best,

Gary

On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <rohan.thimma...@gmail.com>
wrote:

> Hi Gary,
>
> This is perfect. I am able to get the window working on message timestamp
> then clock window also stream the data that are late.
>
> I also having one edge case.
>
>
> for eg i get my last report at 4.57 and i never get 5.00+ hour report
> *ever*. i would like to wait for sometime. My reporting interval size  is
> 30 min. if in next 30 min if i don't see any record then i would like to
> construct 4-5 by closing the window and dispatch the report. Intention is i
> don't want to loss the last hour of the data since the stream end in
> between the hour.
>
> Rohan
>
> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <g...@data-artisans.com> wrote:
>
>> Hi Rohan,
>>
>> Your ReportTimestampExtractor assigns timestamps to the stream records
>> correctly
>> but uses the wall clock to emit Watermarks (System.currentTimeMillis). In
>> Flink
>> Watermarks are the mechanism to advance the event time. Hence, you should
>> emit
>> Watermarks according to the time that you extract from your events.
>>
>> You can take a look at the already existing timestamp extractors /
>> watermark
>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>> how it can
>> be done.
>>
>> Best,
>> Gary
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/event_timestamp_extractors.html
>>
>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>> rohan.thimma...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>>
>>> I have following requirement
>>>
>>> 1. i have avro json message containing {eventid, usage, starttime,
>>> endtime}
>>> 2. i am reading this from kafka source
>>>
>>> 3. if there is overlapping hour in a record split the record by rounding
>>> off to hourly bounderies
>>> 4.My objective is a) read the message b) aggregate the usage between the
>>> hour
>>> 5. send the aggregated data to another kafka topic.
>>>
>>> i don't want aggregate based on clock window. if i see next hour in
>>> endtime then i would like to close the window and aggregated usage to be
>>> send down to kafka sink topic.
>>>
>>>
>>> eg:
>>> input data
>>> 4.55 - 5.00
>>> 5.00 -5.25
>>> 5.25- 5.55.
>>> 5.55-625
>>>
>>> after split
>>> 4.55- 5.00 - expect record to be going out with this
>>> 5.00 -5.25
>>> 5.25- 5.55.
>>> 5.55-6.00 - expect record to be going out with this
>>> 5.00-625
>>>
>>>
>>>
>>>
>>> 1. i have set the eventime : 
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] 
>>> = stream
>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes 
>>> then create split recordr with hourly boundarry
>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>   .keyBy(0)
>>>       
>>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>
>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>
>>> 3. here is the implementation for timestampeextractor
>>>
>>> class ReportTimestampExtractor extends 
>>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable 
>>> {
>>>   override def extractTimestamp(e: Tuple2[String, Report], 
>>> prevElementTimestamp: Long) = {
>>>     e.f1.getEndTime
>>>   }
>>>
>>>   override def getCurrentWatermark(): Watermark = {
>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 
>>> hour
>>>   }
>>> }
>>>
>>>
>>> I see the aggregation is generated only the clock window rather than when 
>>> the window sees next hour in the record.
>>>
>>>
>>>
>>> Is there anything i am missing. by definition eventtime if i set it should 
>>> respect message time rather than clock window
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Rohan
>>>
>>
>>
>
>
> --
> Thanks
> Rohan
>

Reply via email to