Hello, 

I’ve been struggling for a while now with Kafka windows. I want to calculate an 
average heart rate on a 5 minute window. I have three apps :

1)- A heart beat generator. It starts at the top of the minute (second 0). 
Sends between 60 to 70 beats using a producer. The event time is set with the 
call to the producer. This code looks like :

        this.producer.send(new ProducerRecord<>(topic, null, eventTime, userId, 
value));  

The eventTime is based on the current system time and won’t span the entire 
minute because what I do is send all messages at once and wait for a minute 
before seeing another set of heart beats. Every time I send a message, I give 
it a new timestamp but I don’t think the eventTime span will be very wide. All 
messages for the minute are sent very quickly.


2) - The second application is the heart beat processor. Also starts at top of 
the minute. It seems to be doing what I want it to. There is a window of 60 
seconds and it counts up the beats. The stream’s peak output looks like this : 
(the key is a userid, the value is a JSON document).

19/01/29 10:05:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:05:03 INFO HBProcessor: Heart Rate -> K : [1], V : 
[{"userId":"1","beatCount":71,"addTime":1548770700939}]
19/01/29 10:05:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:06:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:06:03 INFO HBProcessor: Heart Rate -> K : [1], V : 
[{"userId":"1","beatCount":63,"addTime":1548770760991}]
19/01/29 10:06:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:07:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:07:03 INFO HBProcessor: Heart Rate -> K : [1], V : 
[{"userId":"1","beatCount":80,"addTime":1548770821066}]
19/01/29 10:07:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:09:03 INFO HBProcessor: 
-----------------------------------------------------------------
19/01/29 10:09:03 INFO HBProcessor: Heart Rate -> K : [1], V : 
[{"userId":"1","beatCount":62,"addTime":1548770940046}]
19/01/29 10:09:03 INFO HBProcessor: 
-----------------------------------------------------------------

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG is currently 1 MS. I’ve tried other 
values like 10000. 



3) - The third application is trying to do an average of (2’s stream) based on 
a window of 5 minutes. Also starts at top of the minute. Either I don’t 
understand the streaming API or there’s something wrong. Its probably that I 
don’t understand the API ;-)

The window for this is 300 seconds. It’s a tumbling window. What I want is an 
average that is calculated with the same count of elements so that the average 
doesn’t hop around as much…However, sometimes the average is calculated on just 
one value… I’ve taken the average calculation out to simplify and just show the 
values that would be used in the calculation, This is “heartRates” in the JSON 
documents below. Every tumbling window varies in terms of the number of 
elements that would be used in the calculation.


The first window would use 4 values, the second uses 5 and the third one would 
use 1. h


19/01/29 10:13:32 INFO HeartRateProcessor: 
-----------------------------------------------------------------
19/01/29 10:13:32 INFO HeartRateProcessor: AVG Heart Rate -> K : [1], V : 
[{"userId":"1","count":4,"heartRates":[{"userId":"1","beatCount":63,"addTime":1548771000092},{"userId":"1","beatCount":70,"addTime":1548771060143},{"userId":"1","beatCount":66,"addTime":1548771120195},{"userId":"1","beatCount":76,"addTime":1548771180249}]}]
19/01/29 10:13:32 INFO HeartRateProcessor: 
-----------------------------------------------------------------
19/01/29 10:14:32 INFO HeartRateProcessor: 
-----------------------------------------------------------------
19/01/29 10:14:32 INFO HeartRateProcessor: AVG Heart Rate -> K : [1], V : 
[{"userId":"1","count":5,"heartRates":[{"userId":"1","beatCount":63,"addTime":1548771000092},{"userId":"1","beatCount":70,"addTime":1548771060143},{"userId":"1","beatCount":66,"addTime":1548771120195},{"userId":"1","beatCount":76,"addTime":1548771180249},{"userId":"1","beatCount":67,"addTime":1548771240297}]}]
19/01/29 10:14:32 INFO HeartRateProcessor: 
-----------------------------------------------------------------
19/01/29 10:15:33 INFO HeartRateProcessor: 
-----------------------------------------------------------------
19/01/29 10:15:33 INFO HeartRateProcessor: AVG Heart Rate -> K : [1], V : 
[{"userId":"1","count":1,"heartRates":[{"userId":"1","beatCount":73,"addTime":1548771300352}]}]
19/01/29 10:15:33 INFO HeartRateProcessor: ————————————————————————————————

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG is currently 1 MS. I’ve tried other 
values like 10000. 
I know 1 ms is incorrect but I’m trying to troubleshoot… Maybe there is an 
issue with commits to the log…? Is there a tweak I need to do for that?...


Thank you in advance for steering me in the right direction.


Rene

Reply via email to