markusthoemmes closed pull request #3423: Measure kafka queue in consumer with better exactitude URL: https://github.com/apache/incubator-openwhisk/pull/3423
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 9470bd3c6e..402ae5d6cc 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -73,6 +73,7 @@ whisk { // A low value improves latency performance but it is important to not set it too low // as that will cause excessive busy-waiting. fetch-max-wait-ms = 20 + metric-flush-interval-s = 60 } topics { diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 26a0f8635a..fc0954e316 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -21,6 +21,7 @@ import java.util.Properties import akka.actor.ActorSystem import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{RetriableException, WakeupException} import org.apache.kafka.common.serialization.ByteArrayDeserializer import pureconfig.loadConfigOrThrow @@ -33,7 +34,7 @@ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.collection.JavaConverters._ -case class KafkaConsumerConfig(sessionTimeoutMs: Long) +case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int) class KafkaConsumerConnector( kafkahost: String, @@ -49,6 +50,10 @@ class KafkaConsumerConnector( // logic, like the wakeup timer. private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer) + // Currently consumed offset, is used to calculate the topic lag. + // It is updated from one thread in "peek", no concurrent data structure is necessary + private var offset: Long = 0 + /** * Long poll for messages. Method returns once message are available but no later than given * duration. @@ -62,7 +67,11 @@ class KafkaConsumerConnector( val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup()) try { - consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) + val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) + response.lastOption.foreach { + case (_, _, newOffset, _) => offset = newOffset + 1 + } + response } catch { // Happens if the peek hangs. case _: WakeupException if retry > 0 => @@ -138,14 +147,13 @@ class KafkaConsumerConnector( @volatile private var consumer = getConsumer(getProps, Some(List(topic))) -// Read current lag of the consumed topic, e.g. invoker queue and -// emit kamon histogram metric every 5 seconds -// Since we use only one partition in kafka, it is defined 0 in the metric name - actorSystem.scheduler.schedule(10.second, 5.second) { - val queueSize = consumer.metrics.asScala - .find(_._1.name() == s"$topic-0.records-lag") - .map(_._2.value().toInt) - .getOrElse(0) - MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize) + // Read current lag of the consumed topic, e.g. invoker queue + // Since we use only one partition in kafka, it is defined 0 + actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) { + val topicAndPartition = Set(new TopicPartition(topic, 0)) + consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset => + val queueSize = endOffset - offset + MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize) + } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services