Hi everyone,

I have implemented a way to measure latency in a DataStream (I hope): I'm
consuming a Kafka topic and I'm union'ing the resulting stream with a
custom source that emits a (machine-local) timestamp every 1000ms (using
currentTimeMillis). On the consuming end I'm distinguishing between the
Kafka events and the timestamps. When encountering a timestamp, I take the
difference of the processing machine's local time and the timestamp found
in the stream, expecting a positive difference (with the processing
machine's timestamp being larger than the timestamp found in the stream).
However, the opposite is the case. Now I am wondering about when events are
actually processed.

Union the Stream from Kafka+my custom source, batching them in 10s windows
(which is what I do), I expect 10 timestamps with ascending values and a
rough gap of 1000ms in the stream:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68

On the receiving end I again take the currentTimeMillis in my fold
function, expecting the resulting value to be larger (most of the time)
than the timestamps encountered in the stream:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53

The system clocks are in sync up to 1ms.

Maybe I am not clear about when certain timestamps are created (i.e. when
the UDFs are invoked) or how windows are processed. Any advice is greatly
appreciated, also alternative approaches to calculating latency.

I'm on Flink 0.10.2 by the way.

Thanks in advance for the help!

Robert

-- 
My GPG Key ID: 336E2680

Reply via email to