Re: Checkpointing is not performing well

2019-09-07 Thread Rohan Thimmappa
Ravi, have you looked at the io operation(iops) rate of the disk? You can 
monitoring the iops performance and tune it accordingly with your work load. 
This helped us in our project when we hit the wall tuning prototype much all 
the parameters.

Rohan



From: Ravi Bhushan Ratnakar 
Sent: Saturday, September 7, 2019 5:38 PM
To: Rafi Aroch
Cc: user
Subject: Re: Checkpointing is not performing well

Hi Rafi,

Thank you for your quick response.

I have tested with rocksdb state backend. Rocksdb required significantly more 
taskmanager to perform as compare to filesystem state backend. The problem here 
is that checkpoint process is not fast enough to complete.

Our requirement is to do checkout as soon as possible like in 5 seconds to 
flush the output to output sink. As the incoming data rate is high, it is not 
able to complete quickly. If I increase the checkpoint duration, the state size 
grows much faster and hence takes much longer time to complete checkpointing. I 
also tried to use AT LEAST ONCE mode, but does not improve much. Adding more 
taskmanager to increase parallelism also does not improve the checkpointing 
performance.

Is it possible to achieve checkpointing as short as 5 seconds with such high 
input volume?

Regards,
Ravi

On Sat 7 Sep, 2019, 22:25 Rafi Aroch, 
mailto:rafi.ar...@gmail.com>> wrote:
Hi Ravi,

Consider moving to RocksDB state backend, where you can enable incremental 
checkpointing. This will make you checkpoints size stay pretty much constant 
even when your state becomes larger.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend


Thanks,
Rafi

On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar 
mailto:ravibhushanratna...@gmail.com>> wrote:
Hi All,

I am writing a streaming application using Flink 1.9. This application consumes 
data from kinesis stream which is basically avro payload. Application is using 
KeyedProcessFunction to execute business logic on the basis of correlation id 
using event time characteristics with below configuration --
StateBackend - filesystem with S3 storage
registerTimeTimer duration for each key is  -  currentWatermark  + 15 seconds
checkpoint interval - 1min
minPauseBetweenCheckpointInterval - 1 min
checkpoint timeout - 10mins

incoming data rate from kinesis -  ~10 to 21GB/min

Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)

First 2-4 checkpoints get completed within 1mins where the state size is 
usually 50GB. As the state size grows beyond 50GB, then checkpointing time 
starts taking more than 1mins and it increased till 10 mins and then checkpoint 
fails. The moment the checkpoint starts taking more than 1 mins to complete 
then application starts processing slow and start lagging in output.

Any suggestion to fine tune checkpoint performance would be highly appreciated.

Regards,
Ravi


Re: Class loader premature closure - NoClassDefFoundError: org/apache/kafka/clients/NetworkClient

2019-05-07 Thread Rohan Thimmappa
It is a blocker for exactly once support from flink kafka producer.

This issue reported and closed. but still reproducible
https://issues.apache.org/jira/browse/FLINK-10455

On Mon, May 6, 2019 at 10:20 AM Slotterback, Chris <
chris_slotterb...@comcast.com> wrote:

> Hey Flink users,
>
>
>
> Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its
> write semantic set to Semantic.EXACTLY_ONCE. When there is a job failure
> and restart (in our case from checkpoint timeout), it begins a failure loop
> that requires a cancellation and resubmission to fix. The expected and
> desired outcome should be a recovery from failure and the job restarts
> successfully. Some digging revealed an issue where the class loader closes
> before the connection to kafka is fully terminated resulting in a
> NoClassDefFoundError. A description of what is happening has already been
> described here:
> https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror,
> though we are experiencing this with kafka, not Redis:
>
>
>
> 5/3/19
>
> 3:14:18.780 PM
>
> 2019-05-03 15:14:18,780 ERROR
> org.apache.kafka.common.utils.KafkaThread - Uncaught
> exception in thread 'kafka-producer-network-thread | producer-80':
>
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>
> at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
>
> at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Collapse
>
> date_hour =   15
>
>
>
> Interestingly, this only happens when we extend the FlinkKafkaProducer for
> the purposes of setting the write semantic to EXACTLY_ONCE. When running
> with the default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE), the
> class loader has no issues disconnecting the kafka client on job failure,
> and the job recovers just fine. We are not doing anything particularly
> strange in our extended producer as far as I can tell:
>
>
>
> public class *CustomFlinkKafkaProducer* *extends* *FlinkKafkaProducer*
> {
>
>
>
>   public *CustomFlinkKafkaProducer*(Properties properties, String topicId,
>
>   AvroKeyedSerializer serializationSchema) {
>
> super(
>
> topicId,
>
> serializationSchema,
>
> properties,
>
> Optional.of(new FlinkFixedPartitioner<>()),
>
> *Semantic.EXACTLY_ONCE*,
>
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>
>   }
>
>   public static Properties getPropertiesFromBrokerList(String brokerList) {
>
> […]
>
>   }
>
> }
>
>
>
>
>


-- 
Thanks
Rohan


Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

2019-02-18 Thread Rohan Thimmappa
Hi Tzu-Li,

Any updated on this. This is consistently reproducible.

Same jar - Separate source topic to Separate  destination topic.

This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck.

org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception:
Failed to send data to Kafka: Producer attempted an operation with an
old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98)
at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)





Rohan

On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I think this is unexpected. The generated transactional ids should not be
> clashing.
> Looking at the FlinkKafkaProducer code, it seems like the generation is
> only a function of the subtask id of the FlinkKafkaProducer, which could be
> the same across 2 different Kafka sources.
>
> I'm not completely certain about this. Piotr (in CC) might have more
> insights for this.
>
> Cheers,
> Gordon
>
> On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <
> chris_slotterb...@comcast.com> wrote:
>
>> Hey all,
>>
>>
>>
>> I am running into an issue where if I run 2 flink jobs (same jar,
>> different configuration), that produce to different kafka topics on the
>> same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE
>> semantics, both jobs go into a checkpoint exception loop every 15 seconds
>> or so:
>>
>>
>>
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>> Producer attempted an operation with an old epoch. Either there is a newer
>> producer with the same transactionalId, or the producer's transaction has
>> been expired by the broker.
>>
>>
>>
>> As soon as one of the jobs is cancelled, things go back to normal for the
>> other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in
>> the producer to be unique for each of the jobs. My producer transaction
>> timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is
>> there some way to prevent these jobs from tripping over each other in
>> execution while retaining exactly once processing?
>>
>

-

Cassandra counter datatype support through POJO

2018-03-16 Thread Rohan Thimmappa
Hi All,

i have table containing usage which is counter data type. every time i get
usage for a id and would like to user counter data time to increment it.

https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html


Is it support POJO approach of cassandra sync or i have use SQL approach. I
am looking for the pojo approach?

create table mytable (
 id text,

 usage counter,
 

-

}



https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html


-- 
Thanks
Rohan


Re: Aggregation using event timestamp than clock window

2018-01-28 Thread Rohan Thimmappa
Hi Gary,


 Thanks.I do have some of the events coming in after one pauses and i
am able to see watermarked being advanced event being triggered.


Rohan

On Mon, Jan 15, 2018 at 5:40 AM, Gary Yao  wrote:

> Hi Rohan,
>
> In your example, are you saying that after 5:40 you will not receive any
> events
> at all which could advance the watermark?
>
> I am asking because if you are receiving events for other keys/ids from
> your
> KafkaSource after 5:40, the watermark will still be advanced and fire the
> tumbling window.
>
> Best,
> Gary
>
> On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> No. My question is slightly different.
>>
>> say i get report from 5.10-5.40. the device went offline and never comes
>> back. i will not get any report after 5.40. So 5-6 window never gets closed
>> as we will not get any report after 5.40. in this case 5.00-5.40 data is
>> still in flink memory that will never get sent as we are closing the window
>> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>>
>>
>> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
>> message for the given id then i would like to close the window and send
>> this to destination system(in my case kafka topic.)
>>
>>
>>
>>
>> Rohan
>>
>> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao  wrote:
>>
>>> Hi Rohan,
>>>
>>> I am not sure if I fully understand your problem. For example, if you
>>> receive an
>>> event with a start time of 4:50 and an end time of 5:30, do you want the
>>> "usage"
>>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>>> event had
>>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>>> 5:00
>>> window?
>>>
>>> Best,
>>>
>>> Gary
>>>
>>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>>> rohan.thimma...@gmail.com> wrote:
>>>
>>>> Hi Gary,
>>>>
>>>> This is perfect. I am able to get the window working on message
>>>> timestamp then clock window also stream the data that are late.
>>>>
>>>> I also having one edge case.
>>>>
>>>>
>>>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>>>> *ever*. i would like to wait for sometime. My reporting interval size
>>>> is 30 min. if in next 30 min if i don't see any record then i would like to
>>>> construct 4-5 by closing the window and dispatch the report. Intention is i
>>>> don't want to loss the last hour of the data since the stream end in
>>>> between the hour.
>>>>
>>>> Rohan
>>>>
>>>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao 
>>>> wrote:
>>>>
>>>>> Hi Rohan,
>>>>>
>>>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>>>> correctly
>>>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>>>> In Flink
>>>>> Watermarks are the mechanism to advance the event time. Hence, you
>>>>> should emit
>>>>> Watermarks according to the time that you extract from your events.
>>>>>
>>>>> You can take a look at the already existing timestamp extractors /
>>>>> watermark
>>>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>>>> how it can
>>>>> be done.
>>>>>
>>>>> Best,
>>>>> Gary
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>>> dev/event_timestamp_extractors.html
>>>>>
>>>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>>>> rohan.thimma...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>>
>>>>>> I have following requirement
>>>>>>
>>>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>>>> endtime}
>>>>>> 2. i am reading this from kafka source
>>>>>>
>>>>>> 3. if there is overlapping hour in a record split the record by
>>>>>> rounding off to hourly bounderies
>>>>>> 4.My objective is a) read the messa

Re: Aggregation using event timestamp than clock window

2018-01-15 Thread Rohan Thimmappa
No. My question is slightly different.

say i get report from 5.10-5.40. the device went offline and never comes
back. i will not get any report after 5.40. So 5-6 window never gets closed
as we will not get any report after 5.40. in this case 5.00-5.40 data is
still in flink memory that will never get sent as we are closing the window
by seeing the next hour window. ie any report carrying 6.00 end date in it.


so what i would like to do is. Wait for say 1 or 2 hours if i don't get
message for the given id then i would like to close the window and send
this to destination system(in my case kafka topic.)




Rohan

On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao  wrote:

> Hi Rohan,
>
> I am not sure if I fully understand your problem. For example, if you
> receive an
> event with a start time of 4:50 and an end time of 5:30, do you want the
> "usage"
> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
> event had
> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
> 5:00
> window?
>
> Best,
>
> Gary
>
> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> Hi Gary,
>>
>> This is perfect. I am able to get the window working on message timestamp
>> then clock window also stream the data that are late.
>>
>> I also having one edge case.
>>
>>
>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>> *ever*. i would like to wait for sometime. My reporting interval size
>> is 30 min. if in next 30 min if i don't see any record then i would like to
>> construct 4-5 by closing the window and dispatch the report. Intention is i
>> don't want to loss the last hour of the data since the stream end in
>> between the hour.
>>
>> Rohan
>>
>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao 
>> wrote:
>>
>>> Hi Rohan,
>>>
>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>> correctly
>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>> In Flink
>>> Watermarks are the mechanism to advance the event time. Hence, you
>>> should emit
>>> Watermarks according to the time that you extract from your events.
>>>
>>> You can take a look at the already existing timestamp extractors /
>>> watermark
>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>> how it can
>>> be done.
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/event_timestamp_extractors.html
>>>
>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>> rohan.thimma...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>>
>>>> I have following requirement
>>>>
>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>> endtime}
>>>> 2. i am reading this from kafka source
>>>>
>>>> 3. if there is overlapping hour in a record split the record by
>>>> rounding off to hourly bounderies
>>>> 4.My objective is a) read the message b) aggregate the usage between
>>>> the hour
>>>> 5. send the aggregated data to another kafka topic.
>>>>
>>>> i don't want aggregate based on clock window. if i see next hour in
>>>> endtime then i would like to close the window and aggregated usage to be
>>>> send down to kafka sink topic.
>>>>
>>>>
>>>> eg:
>>>> input data
>>>> 4.55 - 5.00
>>>> 5.00 -5.25
>>>> 5.25- 5.55.
>>>> 5.55-625
>>>>
>>>> after split
>>>> 4.55- 5.00 - expect record to be going out with this
>>>> 5.00 -5.25
>>>> 5.25- 5.55.
>>>> 5.55-6.00 - expect record to be going out with this
>>>> 5.00-625
>>>>
>>>>
>>>>
>>>>
>>>> 1. i have set the eventime : 
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>
>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] 
>>>> = stream
>>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes 
>>>> then create split recordr with hourly boundarry
>>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>>   .keyBy(0)
>>>>

Re: Aggregation using event timestamp than clock window

2018-01-12 Thread Rohan Thimmappa
Hi Gary,

This is perfect. I am able to get the window working on message timestamp
then clock window also stream the data that are late.

I also having one edge case.


for eg i get my last report at 4.57 and i never get 5.00+ hour report *ever*.
i would like to wait for sometime. My reporting interval size  is 30 min.
if in next 30 min if i don't see any record then i would like to construct
4-5 by closing the window and dispatch the report. Intention is i don't
want to loss the last hour of the data since the stream end in between the
hour.

Rohan

On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao  wrote:

> Hi Rohan,
>
> Your ReportTimestampExtractor assigns timestamps to the stream records
> correctly
> but uses the wall clock to emit Watermarks (System.currentTimeMillis). In
> Flink
> Watermarks are the mechanism to advance the event time. Hence, you should
> emit
> Watermarks according to the time that you extract from your events.
>
> You can take a look at the already existing timestamp extractors /
> watermark
> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see how
> it can
> be done.
>
> Best,
> Gary
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamp_extractors.html
>
> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> Hi All,
>>
>>
>> I have following requirement
>>
>> 1. i have avro json message containing {eventid, usage, starttime,
>> endtime}
>> 2. i am reading this from kafka source
>>
>> 3. if there is overlapping hour in a record split the record by rounding
>> off to hourly bounderies
>> 4.My objective is a) read the message b) aggregate the usage between the
>> hour
>> 5. send the aggregated data to another kafka topic.
>>
>> i don't want aggregate based on clock window. if i see next hour in
>> endtime then i would like to close the window and aggregated usage to be
>> send down to kafka sink topic.
>>
>>
>> eg:
>> input data
>> 4.55 - 5.00
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-625
>>
>> after split
>> 4.55- 5.00 - expect record to be going out with this
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-6.00 - expect record to be going out with this
>> 5.00-625
>>
>>
>>
>>
>> 1. i have set the eventime : 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = 
>> stream
>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then 
>> create split recordr with hourly boundarry
>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>   .keyBy(0)
>>   
>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>
>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>
>> 3. here is the implementation for timestampeextractor
>>
>> class ReportTimestampExtractor extends 
>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>>   override def extractTimestamp(e: Tuple2[String, Report], 
>> prevElementTimestamp: Long) = {
>> e.f1.getEndTime
>>   }
>>
>>   override def getCurrentWatermark(): Watermark = {
>> new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>>   }
>> }
>>
>>
>> I see the aggregation is generated only the clock window rather than when 
>> the window sees next hour in the record.
>>
>>
>>
>> Is there anything i am missing. by definition eventtime if i set it should 
>> respect message time rather than clock window
>>
>>
>>
>>
>> --
>> Thanks
>> Rohan
>>
>
>


-- 
Thanks
Rohan


Aggregation using event timestamp than clock window

2018-01-11 Thread Rohan Thimmappa
Hi All,


I have following requirement

1. i have avro json message containing {eventid, usage, starttime, endtime}
2. i am reading this from kafka source

3. if there is overlapping hour in a record split the record by rounding
off to hourly bounderies
4.My objective is a) read the message b) aggregate the usage between the
hour
5. send the aggregated data to another kafka topic.

i don't want aggregate based on clock window. if i see next hour in endtime
then i would like to close the window and aggregated usage to be send down
to kafka sink topic.


eg:
input data
4.55 - 5.00
5.00 -5.25
5.25- 5.55.
5.55-625

after split
4.55- 5.00 - expect record to be going out with this
5.00 -5.25
5.25- 5.55.
5.55-6.00 - expect record to be going out with this
5.00-625




1. i have set the eventime :
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String,
Report]] = stream
  .flatMap(new SplitFlatMap()  // checks if the overlapping hour if
yes then create split recordr with hourly boundarry
  .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))

  .reduce(new Counter()) //aggrigates the usage collected within window

3. here is the implementation for timestampeextractor

class ReportTimestampExtractor extends
AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with
Serializable {
  override def extractTimestamp(e: Tuple2[String, Report],
prevElementTimestamp: Long) = {
e.f1.getEndTime
  }

  override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
  }
}


I see the aggregation is generated only the clock window rather than
when the window sees next hour in the record.



Is there anything i am missing. by definition eventtime if i set it
should respect message time rather than clock window




-- 
Thanks
Rohan