This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 01ab888  KAFKA-13229: add total blocked time metric to streams 
(KIP-761) (#11149)
01ab888 is described below

commit 01ab888dbd08ccd4b0de9333d21581ce24fe2c3b
Author: Rohan <desai.p.ro...@gmail.com>
AuthorDate: Mon Aug 30 15:39:25 2021 -0700

    KAFKA-13229: add total blocked time metric to streams (KIP-761) (#11149)
    
    * Add the following producer metrics:
    flush-time-total: cumulative sum of time elapsed during in flush.
    txn-init-time-total: cumulative sum of time elapsed during in 
initTransactions.
    txn-begin-time-total: cumulative sum of time elapsed during in 
beginTransaction.
    txn-send-offsets-time-total: cumulative sum of time elapsed during in 
sendOffsetsToTransaction.
    txn-commit-time-total: cumulative sum of time elapsed during in 
commitTransaction.
    txn-abort-time-total: cumulative sum of time elapsed during in 
abortTransaction.
    
    * Add the following consumer metrics:
    commited-time-total: cumulative sum of time elapsed during in committed.
    commit-sync-time-total: cumulative sum of time elapsed during in commitSync.
    
    * Add a total-blocked-time metric to streams that is the sum of:
    consumer’s io-waittime-total
    consumer’s iotime-total
    consumer’s committed-time-total
    consumer’s commit-sync-time-total
    restore consumer’s io-waittime-total
    restore consumer’s iotime-total
    admin client’s io-waittime-total
    admin client’s iotime-total
    producer’s bufferpool-wait-time-total
    producer's flush-time-total
    producer's txn-init-time-total
    producer's txn-begin-time-total
    producer's txn-send-offsets-time-total
    producer's txn-commit-time-total
    producer's txn-abort-time-total
    
    Reviewers: Bruno Cadonna <cado...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   7 +-
 .../consumer/internals/KafkaConsumerMetrics.java   |  33 ++++
 .../kafka/clients/producer/KafkaProducer.java      |  18 +++
 .../producer/internals/KafkaProducerMetrics.java   | 123 ++++++++++++++
 .../producer/internals/SenderMetricsRegistry.java  |   3 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 102 +++++++++++-
 .../internals/KafkaConsumerMetricsTest.java        |  76 +++++++++
 .../kafka/clients/producer/KafkaProducerTest.java  | 123 ++++++++++++++
 .../internals/KafkaProducerMetricsTest.java        | 117 ++++++++++++++
 .../processor/internals/ActiveTaskCreator.java     |  14 +-
 .../processor/internals/GlobalStreamThread.java    |   2 +
 .../streams/processor/internals/StreamThread.java  |  16 ++
 .../internals/StreamThreadTotalBlockedTime.java    |  59 +++++++
 .../processor/internals/StreamsProducer.java       |  46 +++++-
 .../streams/processor/internals/TaskManager.java   |   4 +
 .../kafka/streams/processor/internals/Tasks.java   |   4 +
 .../internals/metrics/StreamsMetricsImpl.java      |  40 +++++
 .../processor/internals/metrics/ThreadMetrics.java |  29 ++++
 .../integration/MetricsIntegrationTest.java        |   4 +
 .../processor/internals/ActiveTaskCreatorTest.java |  52 +++++-
 .../processor/internals/RecordCollectorTest.java   |  19 ++-
 .../StreamThreadTotalBlockedTimeTest.java          | 113 +++++++++++++
 .../processor/internals/StreamsProducerTest.java   | 177 ++++++++++++++++++---
 .../internals/metrics/StreamsMetricsImplTest.java  |  90 +++++++++++
 .../internals/metrics/ThreadMetricsTest.java       |  57 +++++++
 .../streams/state/KeyValueStoreTestDriver.java     |   4 +-
 .../StreamThreadStateStoreProviderTest.java        |   4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   8 +-
 29 files changed, 1302 insertions(+), 44 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5dd9187..bd53af1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -14,7 +14,7 @@
     <suppress checks="NPathComplexity"
               files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
     <suppress checks="JavaNCSS"
-              files="(ApiMessageType|FieldSpec|MessageDataGenerator).java"/>
+              
files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/>
     <suppress checks="MethodLength"
               files="(FieldSpec|MessageDataGenerator).java"/>
     <suppress id="dontUseSystemExit"
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 29a9e37..286f84b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1485,6 +1485,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final Duration timeout) {
         acquireAndEnsureOpen();
+        long commitStart = time.nanoseconds();
         try {
             maybeThrowInvalidGroupIdException();
             offsets.forEach(this::updateLastSeenEpochIfNewer);
@@ -1493,6 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
                         "committing offsets " + offsets);
             }
         } finally {
+            kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - 
commitStart);
             release();
         }
     }
@@ -1871,9 +1873,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions, final Duration timeout) {
         acquireAndEnsureOpen();
+        long start = time.nanoseconds();
         try {
             maybeThrowInvalidGroupIdException();
-            Map<TopicPartition, OffsetAndMetadata> offsets = 
coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+            final Map<TopicPartition, OffsetAndMetadata> offsets;
+            offsets = coordinator.fetchCommittedOffsets(partitions, 
time.timer(timeout));
             if (offsets == null) {
                 throw new TimeoutException("Timeout of " + timeout.toMillis() 
+ "ms expired before the last " +
                     "committed offset for partitions " + partitions + " could 
be determined. Try tuning default.api.timeout.ms " +
@@ -1883,6 +1887,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
                 return offsets;
             }
         } finally {
+            kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
             release();
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
index 71332b8..0dc8a33 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,8 @@ public class KafkaConsumerMetrics implements AutoCloseable {
     private final MetricName lastPollMetricName;
     private final Sensor timeBetweenPollSensor;
     private final Sensor pollIdleSensor;
+    private final Sensor committedSensor;
+    private final Sensor commitSyncSensor;
     private final Metrics metrics;
     private long lastPollMs;
     private long pollStartMs;
@@ -63,6 +66,26 @@ public class KafkaConsumerMetrics implements AutoCloseable {
                 metricGroupName,
                 "The average fraction of time the consumer's poll() is idle as 
opposed to waiting for the user code to process records."),
                 new Avg());
+
+        this.commitSyncSensor = metrics.sensor("commit-sync-time-ns-total");
+        this.commitSyncSensor.add(
+            metrics.metricName(
+                "commit-sync-time-ns-total",
+                metricGroupName,
+                "The total time the consumer has spent in commitSync in 
nanoseconds"
+            ),
+            new CumulativeSum()
+        );
+
+        this.committedSensor = metrics.sensor("committed-time-ns-total");
+        this.committedSensor.add(
+            metrics.metricName(
+                "committed-time-ns-total",
+                metricGroupName,
+                "The total time the consumer has spent in committed in 
nanoseconds"
+            ),
+            new CumulativeSum()
+        );
     }
 
     public void recordPollStart(long pollStartMs) {
@@ -78,10 +101,20 @@ public class KafkaConsumerMetrics implements AutoCloseable 
{
         this.pollIdleSensor.record(pollIdleRatio);
     }
 
+    public void recordCommitSync(long duration) {
+        this.commitSyncSensor.record(duration);
+    }
+
+    public void recordCommitted(long duration) {
+        this.committedSensor.record(duration);
+    }
+
     @Override
     public void close() {
         metrics.removeMetric(lastPollMetricName);
         metrics.removeSensor(timeBetweenPollSensor.name());
         metrics.removeSensor(pollIdleSensor.name());
+        metrics.removeSensor(commitSyncSensor.name());
+        metrics.removeSensor(committedSensor.name());
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ef8a9cc..756e300 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetadata;
 import org.apache.kafka.clients.producer.internals.ProducerMetrics;
@@ -241,6 +242,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final String clientId;
     // Visible for testing
     final Metrics metrics;
+    private final KafkaProducerMetrics producerMetrics;
     private final Partitioner partitioner;
     private final int maxRequestSize;
     private final long totalMemorySize;
@@ -356,6 +358,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                     
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time, 
metricsContext);
+            this.producerMetrics = new KafkaProducerMetrics(metrics);
             this.partitioner = config.getConfiguredInstance(
                     ProducerConfig.PARTITIONER_CLASS_CONFIG,
                     Partitioner.class,
@@ -590,9 +593,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();
         TransactionalRequestResult result = 
transactionManager.initializeTransactions();
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        producerMetrics.recordInit(time.nanoseconds() - now);
     }
 
     /**
@@ -613,7 +618,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void beginTransaction() throws ProducerFencedException {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();
         transactionManager.beginTransaction();
+        producerMetrics.recordBeginTxn(time.nanoseconds() - now);
     }
 
     /**
@@ -697,9 +704,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long start = time.nanoseconds();
         TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        producerMetrics.recordSendOffsets(time.nanoseconds() - start);
     }
 
     /**
@@ -730,9 +739,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
     public void commitTransaction() throws ProducerFencedException {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long commitStart = time.nanoseconds();
         TransactionalRequestResult result = transactionManager.beginCommit();
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
     }
 
     /**
@@ -761,9 +772,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         throwIfNoTransactionManager();
         throwIfProducerClosed();
         log.info("Aborting incomplete transaction");
+        long abortStart = time.nanoseconds();
         TransactionalRequestResult result = transactionManager.beginAbort();
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
     }
 
     /**
@@ -1124,12 +1137,16 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
     @Override
     public void flush() {
         log.trace("Flushing accumulated records in producer.");
+
+        long start = time.nanoseconds();
         this.accumulator.beginFlush();
         this.sender.wakeup();
         try {
             this.accumulator.awaitFlushCompletion();
         } catch (InterruptedException e) {
             throw new InterruptException("Flush interrupted.", e);
+        } finally {
+            producerMetrics.recordFlush(time.nanoseconds() - start);
         }
     }
 
@@ -1245,6 +1262,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         }
 
         Utils.closeQuietly(interceptors, "producer interceptors", 
firstException);
+        Utils.closeQuietly(producerMetrics, "producer metrics wrapper", 
firstException);
         Utils.closeQuietly(metrics, "producer metrics", firstException);
         Utils.closeQuietly(keySerializer, "producer keySerializer", 
firstException);
         Utils.closeQuietly(valueSerializer, "producer valueSerializer", 
firstException);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
new file mode 100644
index 0000000..b8ea762
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
+
+    private final Map<String, String> tags;
+    private final Metrics metrics;
+    private final Sensor initTimeSensor;
+    private final Sensor beginTxnTimeSensor;
+    private final Sensor flushTimeSensor;
+    private final Sensor sendOffsetsSensor;
+    private final Sensor commitTxnSensor;
+    private final Sensor abortTxnSensor;
+
+    public KafkaProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        tags = this.metrics.config().tags();
+        flushTimeSensor = newLatencySensor(
+            FLUSH,
+            "Total time producer has spent in flush in nanoseconds."
+        );
+        initTimeSensor = newLatencySensor(
+            TXN_INIT,
+            "Total time producer has spent in initTransactions in nanoseconds."
+        );
+        beginTxnTimeSensor = newLatencySensor(
+            TXN_BEGIN,
+            "Total time producer has spent in beginTransaction in nanoseconds."
+        );
+        sendOffsetsSensor = newLatencySensor(
+            TXN_SEND_OFFSETS,
+            "Total time producer has spent in sendOffsetsToTransaction."
+        );
+        commitTxnSensor = newLatencySensor(
+            TXN_COMMIT,
+            "Total time producer has spent in commitTransaction."
+        );
+        abortTxnSensor = newLatencySensor(
+            TXN_ABORT,
+            "Total time producer has spent in abortTransaction."
+        );
+    }
+
+    @Override
+    public void close() {
+        removeMetric(FLUSH);
+        removeMetric(TXN_INIT);
+        removeMetric(TXN_BEGIN);
+        removeMetric(TXN_SEND_OFFSETS);
+        removeMetric(TXN_COMMIT);
+        removeMetric(TXN_ABORT);
+    }
+
+    public void recordFlush(long duration) {
+        flushTimeSensor.record(duration);
+    }
+
+    public void recordInit(long duration) {
+        initTimeSensor.record(duration);
+    }
+
+    public void recordBeginTxn(long duration) {
+        beginTxnTimeSensor.record(duration);
+    }
+
+    public void recordSendOffsets(long duration) {
+        sendOffsetsSensor.record(duration);
+    }
+
+    public void recordCommitTxn(long duration) {
+        commitTxnSensor.record(duration);
+    }
+
+    public void recordAbortTxn(long duration) {
+        abortTxnSensor.record(duration);
+    }
+
+    private Sensor newLatencySensor(String name, String description) {
+        Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
+        sensor.add(metricName(name, description), new CumulativeSum());
+        return sensor;
+    }
+
+    private MetricName metricName(final String name, final String description) 
{
+        return metrics.metricName(name + TOTAL_TIME_SUFFIX, GROUP, 
description, tags);
+    }
+
+    private void removeMetric(final String name) {
+        metrics.removeSensor(name + TOTAL_TIME_SUFFIX);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
index 6438973..2ad2cba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Sensor;
 
 public class SenderMetricsRegistry {
 
-    final static String METRIC_GROUP_NAME = "producer-metrics";
     final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";
 
     private final List<MetricNameTemplate> allTemplates;
@@ -154,7 +153,7 @@ public class SenderMetricsRegistry {
     }
 
     private MetricName createMetricName(String name, String description) {
-        return this.metrics.metricInstance(createTemplate(name, 
METRIC_GROUP_NAME, description, this.tags));
+        return this.metrics.metricInstance(createTemplate(name, 
KafkaProducerMetrics.GROUP, description, this.tags));
     }
 
     private MetricNameTemplate createTopicTemplate(String name, String 
description) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 57cd942..bc7d506 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -1926,6 +1927,96 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testMeasureCommitSyncDurationOnFailure() {
+        final KafkaConsumer<String, String> consumer
+            = consumerWithPendingError(new 
MockTime(Duration.ofSeconds(1).toMillis()));
+
+        try {
+            consumer.commitSync(Collections.singletonMap(tp0, new 
OffsetAndMetadata(10L)));
+        } catch (final RuntimeException e) {
+        }
+
+        final Metric metric = consumer.metrics()
+            .get(consumer.metrics.metricName("commit-sync-time-ns-total", 
"consumer-metrics"));
+        assertTrue((Double) metric.metricValue() >= 
Duration.ofMillis(999).toNanos());
+    }
+
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new 
LogContext(),
+            OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata,
+            assignor, true, groupInstanceId);
+        consumer.assign(singletonList(tp0));
+
+        client.prepareResponseFrom(
+            FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, 
node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+        client.prepareResponseFrom(
+            offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE)),
+            coordinator
+        );
+
+        consumer.commitSync(Collections.singletonMap(tp0, new 
OffsetAndMetadata(10L)));
+
+        final Metric metric = consumer.metrics()
+            .get(consumer.metrics.metricName("commit-sync-time-ns-total", 
"consumer-metrics"));
+        assertTrue((Double) metric.metricValue() >= 
Duration.ofMillis(999).toNanos());
+    }
+
+    @Test
+    public void testMeasureCommittedDurationOnFailure() {
+        final KafkaConsumer<String, String> consumer
+            = consumerWithPendingError(new 
MockTime(Duration.ofSeconds(1).toMillis()));
+
+        try {
+            consumer.committed(Collections.singleton(tp0));
+        } catch (final RuntimeException e) {
+        }
+
+        final Metric metric = consumer.metrics()
+            .get(consumer.metrics.metricName("committed-time-ns-total", 
"consumer-metrics"));
+        assertTrue((Double) metric.metricValue() >= 
Duration.ofMillis(999).toNanos());
+    }
+
+    @Test
+    public void testMeasureCommittedDuration() {
+        long offset1 = 10000;
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new 
LogContext(),
+            OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata,
+            assignor, true, groupInstanceId);
+        consumer.assign(singletonList(tp0));
+
+        // lookup coordinator
+        client.prepareResponseFrom(
+            FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, 
node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+
+        // fetch offset for one topic
+        client.prepareResponseFrom(
+            offsetResponse(Collections.singletonMap(tp0, offset1), 
Errors.NONE), coordinator);
+
+        consumer.committed(Collections.singleton(tp0)).get(tp0).offset();
+
+        final Metric metric = consumer.metrics()
+            .get(consumer.metrics.metricName("committed-time-ns-total", 
"consumer-metrics"));
+        assertTrue((Double) metric.metricValue() >= 
Duration.ofMillis(999).toNanos());
+    }
+
+    @Test
     public void testRebalanceException() {
         Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
@@ -2247,8 +2338,7 @@ public class KafkaConsumerTest {
         consumer.close(Duration.ZERO);
     }
 
-    private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError() {
-        Time time = new MockTime();
+    private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError(final Time time) {
         SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -2262,6 +2352,14 @@ public class KafkaConsumerTest {
         return newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);
     }
 
+    private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError() {
+        return consumerWithPendingAuthenticationError(new MockTime());
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingError(final Time 
time) {
+        return consumerWithPendingAuthenticationError(time);
+    }
+
     private ConsumerRebalanceListener getConsumerRebalanceListener(final 
KafkaConsumer<String, String> consumer) {
         return new ConsumerRebalanceListener() {
             @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
new file mode 100644
index 0000000..087f90b
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class KafkaConsumerMetricsTest {
+    private static final long METRIC_VALUE = 123L;
+    private static final String CONSUMER_GROUP_PREFIX = "consumer";
+    private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
+    private static final String COMMIT_SYNC_TIME_TOTAL = 
"commit-sync-time-ns-total";
+    private static final String COMMITTED_TIME_TOTAL = 
"committed-time-ns-total";
+
+    private final Metrics metrics = new Metrics();
+    private final KafkaConsumerMetrics consumerMetrics
+        = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+
+    @Test
+    public void shouldRecordCommitSyncTime() {
+        // When:
+        consumerMetrics.recordCommitSync(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(COMMIT_SYNC_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRecordCommittedTime() {
+        // When:
+        consumerMetrics.recordCommitted(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(COMMITTED_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRemoveMetricsOnClose() {
+        // When:
+        consumerMetrics.close();
+
+        // Then:
+        assertMetricRemoved(COMMIT_SYNC_TIME_TOTAL);
+        assertMetricRemoved(COMMITTED_TIME_TOTAL);
+    }
+
+    private void assertMetricRemoved(final String name) {
+        assertNull(metrics.metric(metrics.metricName(name, 
CONSUMER_METRIC_GROUP)));
+    }
+
+    private void assertMetricValue(final String name) {
+        assertEquals(
+            metrics.metric(metrics.metricName(name, 
CONSUMER_METRIC_GROUP)).metricValue(),
+            (double) METRIC_VALUE
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2784f19..c48c3fe 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.clients.producer.internals.ProducerMetadata;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -42,6 +43,7 @@ import 
org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
@@ -813,6 +815,41 @@ public class KafkaProducerTest {
         }
     }
 
+    private static Double getMetricValue(final KafkaProducer<?, ?> producer, 
final String name) {
+        Metrics metrics = producer.metrics;
+        Metric metric =  metrics.metric(metrics.metricName(name, 
"producer-metrics"));
+        return (Double) metric.metricValue();
+    }
+
+    @Test
+    public void testFlushMeasureLatency() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(
+            configs,
+            new StringSerializer(),
+            new StringSerializer(),
+            metadata,
+            client,
+            null,
+            time
+        )) {
+            producer.flush();
+            double first = getMetricValue(producer, "flush-time-ns-total");
+            assertTrue(first > 0);
+            producer.flush();
+            assertTrue(getMetricValue(producer, "flush-time-ns-total") > 
first);
+        }
+    }
+
     @Test
     public void testMetricConfigRecordingLevel() {
         Properties props = new Properties();
@@ -952,6 +989,36 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void testMeasureAbortTransactionDuration() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, 
new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            producer.abortTransaction();
+            double first = getMetricValue(producer, "txn-abort-time-ns-total");
+            assertTrue(first > 0);
+
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            producer.abortTransaction();
+            assertTrue(getMetricValue(producer, "txn-abort-time-ns-total") > 
first);
+        }
+    }
+
+    @Test
     public void testSendTxnOffsetsWithGroupId() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
@@ -988,6 +1055,62 @@ public class KafkaProducerTest {
         }
     }
 
+    private void assertDurationAtLeast(KafkaProducer<?, ?> producer, String 
name, double floor) {
+        getAndAssertDurationAtLeast(producer, name, floor);
+    }
+
+    private double getAndAssertDurationAtLeast(KafkaProducer<?, ?> producer, 
String name, double floor) {
+        double value = getMetricValue(producer, name);
+        assertTrue(value > floor);
+        return value;
+    }
+
+    @Test
+    public void testMeasureTransactionDurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Duration tick = Duration.ofSeconds(1);
+        Time time = new MockTime(tick.toMillis());
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, 
new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            assertDurationAtLeast(producer, "txn-init-time-ns-total", 
tick.toNanos());
+
+            client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
+            
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+            
client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
+                new TopicPartition("topic", 0), Errors.NONE)));
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            double beginFirst = getAndAssertDurationAtLeast(producer, 
"txn-begin-time-ns-total", tick.toNanos());
+            producer.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("group"));
+            double sendOffFirst = getAndAssertDurationAtLeast(producer, 
"txn-send-offsets-time-ns-total", tick.toNanos());
+            producer.commitTransaction();
+            double commitFirst = getAndAssertDurationAtLeast(producer, 
"txn-commit-time-ns-total", tick.toNanos());
+
+            client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
+            
client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
+                new TopicPartition("topic", 0), Errors.NONE)));
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            assertDurationAtLeast(producer, "txn-begin-time-ns-total", 
beginFirst + tick.toNanos());
+            producer.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("group"));
+            assertDurationAtLeast(producer, "txn-send-offsets-time-ns-total", 
sendOffFirst + tick.toNanos());
+            producer.commitTransaction();
+            assertDurationAtLeast(producer, "txn-commit-time-ns-total", 
commitFirst + tick.toNanos());
+        }
+    }
+
     @Test
     public void testSendTxnOffsetsWithGroupMetadata() {
         final short maxVersion = (short) 3;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
new file mode 100644
index 0000000..e068861
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class KafkaProducerMetricsTest {
+    private static final long METRIC_VALUE = 123L;
+    private static final String FLUSH_TIME_TOTAL = "flush-time-ns-total";
+    private static final String TXN_INIT_TIME_TOTAL = "txn-init-time-ns-total";
+    private static final String TXN_BEGIN_TIME_TOTAL = 
"txn-begin-time-ns-total";
+    private static final String TXN_COMMIT_TIME_TOTAL = 
"txn-commit-time-ns-total";
+    private static final String TXN_ABORT_TIME_TOTAL = 
"txn-abort-time-ns-total";
+    private static final String TXN_SEND_OFFSETS_TIME_TOTAL = 
"txn-send-offsets-time-ns-total";
+
+    private final Metrics metrics = new Metrics();
+    private final KafkaProducerMetrics producerMetrics = new 
KafkaProducerMetrics(metrics);
+
+    @Test
+    public void shouldRecordFlushTime() {
+        // When:
+        producerMetrics.recordFlush(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(FLUSH_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRecordInitTime() {
+        // When:
+        producerMetrics.recordInit(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(TXN_INIT_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRecordTxBeginTime() {
+        // When:
+        producerMetrics.recordBeginTxn(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(TXN_BEGIN_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRecordTxCommitTime() {
+        // When:
+        producerMetrics.recordCommitTxn(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(TXN_COMMIT_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRecordTxAbortTime() {
+        // When:
+        producerMetrics.recordAbortTxn(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(TXN_ABORT_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRecordSendOffsetsTime() {
+        // When:
+        producerMetrics.recordSendOffsets(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue(TXN_SEND_OFFSETS_TIME_TOTAL);
+    }
+
+    @Test
+    public void shouldRemoveMetricsOnClose() {
+        // When:
+        producerMetrics.close();
+
+        // Then:
+        assertMetricRemoved(FLUSH_TIME_TOTAL);
+        assertMetricRemoved(TXN_INIT_TIME_TOTAL);
+        assertMetricRemoved(TXN_BEGIN_TIME_TOTAL);
+        assertMetricRemoved(TXN_COMMIT_TIME_TOTAL);
+        assertMetricRemoved(TXN_ABORT_TIME_TOTAL);
+        assertMetricRemoved(TXN_SEND_OFFSETS_TIME_TOTAL);
+    }
+
+    private void assertMetricRemoved(final String name) {
+        assertNull(metrics.metric(metrics.metricName(name, 
KafkaProducerMetrics.GROUP)));
+    }
+
+    private void assertMetricValue(final String name) {
+        assertEquals(
+            metrics.metric(metrics.metricName(name, 
KafkaProducerMetrics.GROUP)).metricValue(),
+            (double) METRIC_VALUE
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 06171d3..2ed3bd8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -110,7 +110,8 @@ class ActiveTaskCreator {
                 clientSupplier,
                 null,
                 processId,
-                logContext);
+                logContext,
+                time);
             taskProducers = Collections.emptyMap();
         }
     }
@@ -243,7 +244,8 @@ class ActiveTaskCreator {
                 clientSupplier,
                 taskId,
                 null,
-                logContext);
+                logContext,
+                time);
             taskProducers.put(taskId, streamsProducer);
         } else {
             streamsProducer = threadProducer;
@@ -326,4 +328,12 @@ class ActiveTaskCreator {
         return new LogContext(logPrefix);
     }
 
+    public double totalProducerBlockedTime() {
+        if (threadProducer != null) {
+            return threadProducer.totalBlockedTime();
+        }
+        return taskProducers.values().stream()
+            .mapToDouble(StreamsProducer::totalBlockedTime)
+            .sum();
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index e55c5d7..f45350d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -296,6 +296,7 @@ public class GlobalStreamThread extends Thread {
 
             log.warn("Error happened during initialization of the global state 
store; this thread has shutdown");
             streamsMetrics.removeAllThreadLevelSensors(getName());
+            streamsMetrics.removeAllThreadLevelMetrics(getName());
 
             return;
         }
@@ -338,6 +339,7 @@ public class GlobalStreamThread extends Thread {
             }
 
             streamsMetrics.removeAllThreadLevelSensors(getName());
+            streamsMetrics.removeAllThreadLevelMetrics(getName());
 
             setState(DEAD);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eb3fe79..a1da373 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -512,6 +512,21 @@ public class StreamThread extends Thread {
         ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
         ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
 
+        ThreadMetrics.addThreadStartTimeMetric(
+            threadId,
+            streamsMetrics,
+            time.milliseconds()
+        );
+        ThreadMetrics.addThreadBlockedTimeMetric(
+            threadId,
+            new StreamThreadTotalBlockedTime(
+                mainConsumer,
+                restoreConsumer,
+                taskManager::totalProducerBlockedTime
+            ),
+            streamsMetrics
+        );
+
         this.time = time;
         this.topologyMetadata = topologyMetadata;
         this.logPrefix = logContext.logPrefix();
@@ -1127,6 +1142,7 @@ public class StreamThread extends Thread {
             log.error("Failed to close restore consumer due to the following 
error:", e);
         }
         streamsMetrics.removeAllThreadLevelSensors(getName());
+        streamsMetrics.removeAllThreadLevelMetrics(getName());
 
         setState(State.DEAD);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
new file mode 100644
index 0000000..cf37633
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamThreadTotalBlockedTime {
+    private final Consumer<?, ?> consumer;
+    private final Consumer<?, ?> restoreConsumer;
+    private final Supplier<Double> producerTotalBlockedTime;
+
+    StreamThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    private double metricValue(
+        final Map<MetricName, ? extends Metric> metrics,
+        final String name) {
+        return metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .findFirst()
+            .map(n -> (Double) metrics.get(n).metricValue())
+            .orElse(0.0);
+    }
+
+    public double compute() {
+        return metricValue(consumer.metrics(), "io-waittime-total")
+            + metricValue(consumer.metrics(), "iotime-total")
+            + metricValue(consumer.metrics(), "committed-time-ns-total")
+            + metricValue(consumer.metrics(), "commit-sync-time-ns-total")
+            + metricValue(restoreConsumer.metrics(), "io-waittime-total")
+            + metricValue(restoreConsumer.metrics(), "iotime-total")
+            + producerTotalBlockedTime.get();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 8655f01..23ee0b1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -35,6 +36,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -67,22 +69,26 @@ public class StreamsProducer {
     private final Map<String, Object> eosV2ProducerConfigs;
     private final KafkaClientSupplier clientSupplier;
     private final StreamThread.ProcessingMode processingMode;
+    private final Time time;
 
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private double oldProducerTotalBlockedTime = 0;
 
     public StreamsProducer(final StreamsConfig config,
                            final String threadId,
                            final KafkaClientSupplier clientSupplier,
                            final TaskId taskId,
                            final UUID processId,
-                           final LogContext logContext) {
+                           final LogContext logContext,
+                           final Time time) {
         Objects.requireNonNull(config, "config cannot be null");
         Objects.requireNonNull(threadId, "threadId cannot be null");
         this.clientSupplier = Objects.requireNonNull(clientSupplier, 
"clientSupplier cannot be null");
         log = Objects.requireNonNull(logContext, "logContext cannot be 
null").logger(getClass());
         logPrefix = logContext.logPrefix().trim();
+        this.time = Objects.requireNonNull(time, "time");
 
         processingMode = StreamThread.processingMode(config);
 
@@ -178,12 +184,50 @@ public class StreamsProducer {
             throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
         }
 
+        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        final long start = time.nanoseconds();
         producer.close();
+        final long closeTime = time.nanoseconds() - start;
+        oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private double getMetricValue(final Map<MetricName, ? extends Metric> 
metrics,
+                                  final String name) {
+        final List<MetricName> found = metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .collect(Collectors.toList());
+        if (found.isEmpty()) {
+            return 0.0;
+        }
+        if (found.size() > 1) {
+            final String err = String.format(
+                "found %d values for metric %s. total blocked time computation 
may be incorrect",
+                found.size(),
+                name
+            );
+            log.error(err);
+            throw new IllegalStateException(err);
+        }
+        return (Double) metrics.get(found.get(0)).metricValue();
+    }
+
+    private double totalBlockedTime(final Producer<?, ?> producer) {
+        return getMetricValue(producer.metrics(), "bufferpool-wait-time-total")
+            + getMetricValue(producer.metrics(), "flush-time-ns-total")
+            + getMetricValue(producer.metrics(), "txn-init-time-ns-total")
+            + getMetricValue(producer.metrics(), "txn-begin-time-ns-total")
+            + getMetricValue(producer.metrics(), 
"txn-send-offsets-time-ns-total")
+            + getMetricValue(producer.metrics(), "txn-commit-time-ns-total")
+            + getMetricValue(producer.metrics(), "txn-abort-time-ns-total");
+    }
+
+    public double totalBlockedTime() {
+        return oldProducerTotalBlockedTime + totalBlockedTime(producer);
+    }
+
     private void maybeBeginTransaction() {
         if (eosEnabled() && !transactionInFlight) {
             try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 24b90a6..9269c9d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -124,6 +124,10 @@ public class TaskManager {
         tasks.setMainConsumer(mainConsumer);
     }
 
+    public double totalProducerBlockedTime() {
+        return tasks.totalProducerBlockedTime();
+    }
+
     public UUID processId() {
         return processId;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 35056ff..96c0ee1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -101,6 +101,10 @@ class Tasks {
         );
     }
 
+    double totalProducerBlockedTime() {
+        return activeTaskCreator.totalProducerBlockedTime();
+    }
+
     void createTasks(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
                      final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate) {
         for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated : 
activeTasksToCreate.entrySet()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 9a36898..dea2399 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -91,6 +91,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     private final Version version;
     private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
     private final Deque<String> clientLevelSensors = new LinkedList<>();
+    private final Map<String, Deque<MetricName>> threadLevelMetrics = new 
HashMap<>();
     private final Map<String, Deque<String>> threadLevelSensors = new 
HashMap<>();
     private final Map<String, Deque<String>> taskLevelSensors = new 
HashMap<>();
     private final Map<String, Deque<String>> nodeLevelSensors = new 
HashMap<>();
@@ -200,6 +201,36 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    public <T> void addThreadLevelImmutableMetric(final String name,
+        final String description,
+        final String threadId,
+        final T value) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, 
threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {
+            threadLevelMetrics.computeIfAbsent(
+                threadSensorPrefix(threadId),
+                tid -> new LinkedList<>()
+            ).add(metricName);
+            metrics.addMetric(metricName, new ImmutableMetricValue<>(value));
+        }
+    }
+
+    public <T> void addThreadLevelMutableMetric(final String name,
+                                                final String description,
+                                                final String threadId,
+                                                final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, 
threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {
+            threadLevelMetrics.computeIfAbsent(
+                threadSensorPrefix(threadId),
+                tid -> new LinkedList<>()
+            ).add(metricName);
+            metrics.addMetric(metricName, valueProvider);
+        }
+    }
+
     public final Sensor clientLevelSensor(final String sensorName,
                                           final RecordingLevel recordingLevel,
                                           final Sensor... parents) {
@@ -271,6 +302,15 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    public final void removeAllThreadLevelMetrics(final String threadId) {
+        synchronized (threadLevelMetrics) {
+            final Deque<MetricName> names = 
threadLevelMetrics.remove(threadSensorPrefix(threadId));
+            while (names != null && !names.isEmpty()) {
+                metrics.removeMetric(names.pop());
+            }
+        }
+    }
+
     public Map<String, String> taskLevelTagMap(final String threadId, final 
String taskId) {
         final Map<String, String> tagMap = threadLevelTagMap(threadId);
         tagMap.put(TASK_ID_TAG, taskId);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 28cb10f..8912f6e5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals.metrics;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import 
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
 
 import java.util.Map;
 
@@ -45,6 +46,8 @@ public class ThreadMetrics {
     private static final String CREATE_TASK = "task-created";
     private static final String CLOSE_TASK = "task-closed";
     private static final String SKIP_RECORD = "skipped-records";
+    private static final String BLOCKED_TIME = "blocked-time-ns-total";
+    private static final String THREAD_START_TIME = "thread-start-time";
 
     private static final String COMMIT_DESCRIPTION = "calls to commit";
     private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + 
COMMIT_DESCRIPTION;
@@ -91,6 +94,10 @@ public class ThreadMetrics {
         "The fraction of time the thread spent on polling records from 
consumer";
     private static final String COMMIT_RATIO_DESCRIPTION =
         "The fraction of time the thread spent on committing all tasks";
+    private static final String BLOCKED_TIME_DESCRIPTION =
+        "The total time the thread spent blocked on kafka";
+    private static final String THREAD_START_TIME_DESCRIPTION =
+        "The time that the thread was started";
 
     public static Sensor createTaskSensor(final String threadId,
                                           final StreamsMetricsImpl 
streamsMetrics) {
@@ -310,6 +317,28 @@ public class ThreadMetrics {
         return sensor;
     }
 
+    public static void addThreadStartTimeMetric(final String threadId,
+                                                final StreamsMetricsImpl 
streamsMetrics,
+                                                final long startTime) {
+        streamsMetrics.addThreadLevelImmutableMetric(
+            THREAD_START_TIME,
+            THREAD_START_TIME_DESCRIPTION,
+            threadId,
+            startTime
+        );
+    }
+
+    public static void addThreadBlockedTimeMetric(final String threadId,
+                                                  final 
StreamThreadTotalBlockedTime blockedTime,
+                                                  final StreamsMetricsImpl 
streamsMetrics) {
+        streamsMetrics.addThreadLevelMutableMetric(
+            BLOCKED_TIME,
+            BLOCKED_TIME_DESCRIPTION,
+            threadId,
+            (config, now) -> blockedTime.compute()
+        );
+    }
+
     private static Sensor invocationRateAndCountSensor(final String threadId,
                                                        final String metricName,
                                                        final String 
descriptionOfRate,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 84f5cfc..9ada60f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -191,6 +191,8 @@ public class MetricsIntegrationTest {
     private static final String TASK_CREATED_TOTAL = "task-created-total";
     private static final String TASK_CLOSED_RATE = "task-closed-rate";
     private static final String TASK_CLOSED_TOTAL = "task-closed-total";
+    private static final String BLOCKED_TIME_TOTAL = "blocked-time-ns-total";
+    private static final String THREAD_START_TIME = "thread-start-time";
     private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
     private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
     private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
@@ -503,6 +505,8 @@ public class MetricsIntegrationTest {
         checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, NUM_THREADS);
         checkMetricByName(listMetricThread, TASK_CLOSED_RATE, NUM_THREADS);
         checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, NUM_THREADS);
+        checkMetricByName(listMetricThread, BLOCKED_TIME_TOTAL, NUM_THREADS);
+        checkMetricByName(listMetricThread, THREAD_START_TIME, NUM_THREADS);
     }
 
     private void checkTaskLevelMetrics() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index b689dc1..74d81bd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -55,6 +56,7 @@ import static org.easymock.EasyMock.reset;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertThrows;
 
@@ -117,6 +119,16 @@ public class ActiveTaskCreatorTest {
         assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
     }
 
+    @Test
+    public void shouldReturnBlockedTimeWhenThreadProducer() {
+        final double blockedTime = 123.0;
+        createTasks();
+        final MockProducer<?, ?> producer = 
mockClientSupplier.producers.get(0);
+        addMetric(producer, "flush-time-ns-total", blockedTime);
+
+        assertThat(activeTaskCreator.totalProducerBlockedTime(), 
closeTo(blockedTime, 0.01));
+    }
+
     // error handling
 
     @Test
@@ -224,6 +236,23 @@ public class ActiveTaskCreatorTest {
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldReturnBlockedTimeWhenTaskProducers() {
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+        mockClientSupplier.setApplicationIdForProducer("appId");
+        createTasks();
+        double total = 0.0;
+        double blocked = 1.0;
+        for (final MockProducer<?, ?> producer : mockClientSupplier.producers) 
{
+            addMetric(producer, "flush-time-ns-total", blocked);
+            total += blocked;
+            blocked += 1.0;
+        }
+
+        assertThat(activeTaskCreator.totalProducerBlockedTime(), 
closeTo(total, 0.01));
+    }
+
     // error handling
 
     @SuppressWarnings("deprecation")
@@ -289,7 +318,6 @@ public class ActiveTaskCreatorTest {
     }
 
 
-
     // eos-v2 test
 
     // functional test
@@ -488,4 +516,26 @@ public class ActiveTaskCreatorTest {
             equalTo(mkSet(task00, task01))
         );
     }
+
+    private void addMetric(
+        final MockProducer<?, ?> producer,
+        final String name,
+        final double value) {
+        final MetricName metricName = metricName(name);
+        producer.setMockMetrics(metricName, new Metric() {
+            @Override
+            public MetricName metricName() {
+                return metricName;
+            }
+
+            @Override
+            public Object metricValue() {
+                return value;
+            }
+        });
+    }
+
+    private MetricName metricName(final String name) {
+        return new MetricName(name, "", "", Collections.emptyMap());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 5a83c68..48364f2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import 
org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -133,7 +134,8 @@ public class RecordCollectorTest {
             clientSupplier,
             null,
             processId,
-            logContext
+            logContext,
+            Time.SYSTEM
         );
         mockProducer = clientSupplier.producers.get(0);
         collector = new RecordCollectorImpl(
@@ -792,7 +794,8 @@ public class RecordCollectorTest {
                 },
                 taskId,
                 processId,
-                logContext
+                logContext,
+                Time.SYSTEM
             ),
             productionExceptionHandler,
             streamsMetrics
@@ -823,7 +826,8 @@ public class RecordCollectorTest {
                 },
                 null,
                 null,
-                logContext
+                logContext,
+                Time.SYSTEM
             ),
             productionExceptionHandler,
             streamsMetrics
@@ -857,7 +861,8 @@ public class RecordCollectorTest {
                 },
                 taskId,
                 processId,
-                logContext
+                logContext,
+                Time.SYSTEM
             ),
             productionExceptionHandler,
             streamsMetrics
@@ -895,7 +900,8 @@ public class RecordCollectorTest {
             },
             null,
             null,
-            logContext
+            logContext,
+            Time.SYSTEM
         );
     }
 
@@ -916,7 +922,8 @@ public class RecordCollectorTest {
             },
             null,
             null,
-            logContext
+            logContext,
+            Time.SYSTEM
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
new file mode 100644
index 0000000..c2f3f39
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class StreamThreadTotalBlockedTimeTest {
+    private static final int IOTIME_TOTAL = 1;
+    private static final int IO_WATTIME_TOTAL = 2;
+    private static final int COMMITTED_TIME_TOTAL = 3;
+    private static final int COMMIT_SYNC_TIME_TOTAL = 4;
+    private static final int RESTORE_IOTIME_TOTAL = 5;
+    private static final int RESTORE_IO_WAITTIME_TOTAL = 6;
+    private static final double PRODUCER_BLOCKED_TIME = 7.0;
+
+    @Mock
+    Consumer<?, ?> consumer;
+    @Mock
+    Consumer<?, ?> restoreConsumer;
+    @Mock
+    Supplier<Double> producerBlocked;
+
+    private StreamThreadTotalBlockedTime blockedTime;
+
+    @Rule
+    public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+    @Before
+    public void setup() {
+        blockedTime = new StreamThreadTotalBlockedTime(consumer, 
restoreConsumer, producerBlocked);
+        when(consumer.metrics()).thenAnswer(a -> new MetricsBuilder()
+            .addMetric("iotime-total", IOTIME_TOTAL)
+            .addMetric("io-waittime-total", IO_WATTIME_TOTAL)
+            .addMetric("committed-time-ns-total", COMMITTED_TIME_TOTAL)
+            .addMetric("commit-sync-time-ns-total", COMMIT_SYNC_TIME_TOTAL)
+            .build()
+        );
+        when(restoreConsumer.metrics()).thenAnswer(a -> new MetricsBuilder()
+            .addMetric("iotime-total", RESTORE_IOTIME_TOTAL)
+            .addMetric("io-waittime-total", RESTORE_IO_WAITTIME_TOTAL)
+            .build()
+        );
+        when(producerBlocked.get()).thenReturn(PRODUCER_BLOCKED_TIME);
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        assertThat(
+            blockedTime.compute(),
+            equalTo(IOTIME_TOTAL + IO_WATTIME_TOTAL + COMMITTED_TIME_TOTAL
+                + COMMIT_SYNC_TIME_TOTAL + RESTORE_IOTIME_TOTAL + 
RESTORE_IO_WAITTIME_TOTAL
+                + PRODUCER_BLOCKED_TIME)
+        );
+    }
+
+    private static class MetricsBuilder {
+        private final HashMap<MetricName, Metric> metrics = new HashMap<>();
+
+        private MetricsBuilder addMetric(final String name, final double 
value) {
+            final MetricName metricName = new MetricName(name, "", "", 
Collections.emptyMap());
+            metrics.put(
+                metricName,
+                new Metric() {
+                    @Override
+                    public MetricName metricName() {
+                        return metricName;
+                    }
+
+                    @Override
+                    public Object metricValue() {
+                        return value;
+                    }
+                }
+            );
+            return this;
+        }
+
+        public Map<MetricName, ? extends Metric> build() {
+            return Collections.unmodifiableMap(metrics);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index f4dec89..5e074bf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -33,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -57,6 +60,8 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertSame;
@@ -64,6 +69,13 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class StreamsProducerTest {
+    private static final double BUFFER_POOL_WAIT_TIME = 1;
+    private static final double FLUSH_TME = 2;
+    private static final double TXN_INIT_TIME = 3;
+    private static final double TXN_BEGIN_TIME = 4;
+    private static final double TXN_SEND_OFFSETS_TIME = 5;
+    private static final double TXN_COMMIT_TIME = 6;
+    private static final double TXN_ABORT_TIME = 7;
 
     private final LogContext logContext = new LogContext("test ");
     private final String topic = "topic";
@@ -93,6 +105,8 @@ public class StreamsProducerTest {
         mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2))
     );
 
+    private final Time mockTime = mock(Time.class);
+
     final Producer<byte[], byte[]> mockedProducer = mock(Producer.class);
     final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
         @Override
@@ -106,7 +120,8 @@ public class StreamsProducerTest {
         clientSupplier,
         null,
         null,
-        logContext
+        logContext,
+        mockTime
     );
     final StreamsProducer eosAlphaStreamsProducerWithMock = new 
StreamsProducer(
         eosAlphaConfig,
@@ -114,7 +129,8 @@ public class StreamsProducerTest {
         clientSupplier,
         new TaskId(0, 0),
         null,
-        logContext
+        logContext,
+        mockTime
     );
 
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
@@ -136,8 +152,6 @@ public class StreamsProducerTest {
         mkEntry(new TopicPartition(topic, 0), new OffsetAndMetadata(0L, null))
     );
 
-
-
     @Before
     public void before() {
         mockClientSupplier.setCluster(cluster);
@@ -148,7 +162,8 @@ public class StreamsProducerTest {
                 mockClientSupplier,
                 null,
                 null,
-                logContext
+                logContext,
+                mockTime
             );
         nonEosMockProducer = mockClientSupplier.producers.get(0);
 
@@ -161,7 +176,8 @@ public class StreamsProducerTest {
                 eosAlphaMockClientSupplier,
                 new TaskId(0, 0),
                 null,
-                logContext
+                logContext,
+                mockTime
             );
         eosAlphaStreamsProducer.initTransaction();
         eosAlphaMockProducer = eosAlphaMockClientSupplier.producers.get(0);
@@ -175,10 +191,13 @@ public class StreamsProducerTest {
                 eosBetaMockClientSupplier,
                 null,
                 UUID.randomUUID(),
-                logContext
+                logContext,
+                mockTime
             );
         eosBetaStreamsProducer.initTransaction();
         eosBetaMockProducer = eosBetaMockClientSupplier.producers.get(0);
+        
expect(mockTime.nanoseconds()).andAnswer(Time.SYSTEM::nanoseconds).anyTimes();
+        replay(mockTime);
     }
 
 
@@ -251,7 +270,8 @@ public class StreamsProducerTest {
                 mockClientSupplier,
                 new TaskId(0, 0),
                 UUID.randomUUID(),
-                logContext)
+                logContext,
+                mockTime)
         );
 
         assertThat(thrown.getMessage(), is("config cannot be null"));
@@ -267,7 +287,8 @@ public class StreamsProducerTest {
                 mockClientSupplier,
                 new TaskId(0, 0),
                 UUID.randomUUID(),
-                logContext)
+                logContext,
+                mockTime)
         );
 
         assertThat(thrown.getMessage(), is("threadId cannot be null"));
@@ -283,7 +304,8 @@ public class StreamsProducerTest {
                 null,
                 new TaskId(0, 0),
                 UUID.randomUUID(),
-                logContext)
+                logContext,
+                mockTime)
         );
 
         assertThat(thrown.getMessage(), is("clientSupplier cannot be null"));
@@ -299,7 +321,8 @@ public class StreamsProducerTest {
                 mockClientSupplier,
                 new TaskId(0, 0),
                 UUID.randomUUID(),
-                null)
+                null,
+                mockTime)
         );
 
         assertThat(thrown.getMessage(), is("logContext cannot be null"));
@@ -343,7 +366,8 @@ public class StreamsProducerTest {
             mockClientSupplier,
             null,
             null,
-            logContext
+            logContext,
+            mockTime
         );
     }
 
@@ -462,7 +486,8 @@ public class StreamsProducerTest {
             eosAlphaMockClientSupplier,
             new TaskId(0, 0),
             null,
-            logContext
+            logContext,
+            mockTime
         );
 
         verify(mockMap);
@@ -489,7 +514,8 @@ public class StreamsProducerTest {
             eosAlphaMockClientSupplier,
             null,
             processId,
-            logContext
+            logContext,
+            mockTime
         );
 
         verify(mockMap);
@@ -612,7 +638,8 @@ public class StreamsProducerTest {
             clientSupplier,
             null,
             UUID.randomUUID(),
-            logContext
+            logContext,
+            mockTime
         );
         streamsProducer.initTransaction();
         // call `send()` to start a transaction
@@ -665,7 +692,8 @@ public class StreamsProducerTest {
                 mockClientSupplier,
                 null,
                 UUID.randomUUID(),
-                logContext)
+                logContext,
+                mockTime)
         );
 
         assertThat(thrown.getMessage(), is("taskId cannot be null for 
exactly-once alpha"));
@@ -681,7 +709,8 @@ public class StreamsProducerTest {
                 mockClientSupplier,
                 new TaskId(0, 0),
                 null,
-                logContext)
+                logContext,
+                mockTime)
         );
 
         assertThat(thrown.getMessage(), is("processId cannot be null for 
exactly-once v2"));
@@ -704,7 +733,8 @@ public class StreamsProducerTest {
             clientSupplier,
             new TaskId(0, 0),
             null,
-            logContext
+            logContext,
+            mockTime
         );
 
         final TimeoutException thrown = assertThrows(
@@ -724,7 +754,8 @@ public class StreamsProducerTest {
                 eosAlphaMockClientSupplier,
                 new TaskId(0, 0),
                 null,
-                logContext
+                logContext,
+                mockTime
             );
 
         final IllegalStateException thrown = assertThrows(
@@ -744,7 +775,8 @@ public class StreamsProducerTest {
                 eosBetaMockClientSupplier,
                 null,
                 UUID.randomUUID(),
-                logContext
+                logContext,
+                mockTime
             );
 
         final IllegalStateException thrown = assertThrows(
@@ -772,7 +804,8 @@ public class StreamsProducerTest {
             clientSupplier,
             new TaskId(0, 0),
             null,
-            logContext
+            logContext,
+            mockTime
         );
 
         final StreamsException thrown = assertThrows(
@@ -801,7 +834,8 @@ public class StreamsProducerTest {
             clientSupplier,
             new TaskId(0, 0),
             null,
-            logContext
+            logContext,
+            mockTime
         );
 
         final RuntimeException thrown = assertThrows(
@@ -1105,7 +1139,8 @@ public class StreamsProducerTest {
             clientSupplier,
             null,
             UUID.randomUUID(),
-            logContext
+            logContext,
+            mockTime
         );
         streamsProducer.initTransaction();
 
@@ -1113,6 +1148,7 @@ public class StreamsProducerTest {
         mockedProducer.close();
         mockedProducer.initTransactions();
         expectLastCall();
+        
expect(mockedProducer.metrics()).andReturn(Collections.emptyMap()).anyTimes();
         replay(mockedProducer);
 
         streamsProducer.resetProducer();
@@ -1121,4 +1157,99 @@ public class StreamsProducerTest {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(
+            nonEosMockProducer,
+            BUFFER_POOL_WAIT_TIME,
+            FLUSH_TME,
+            TXN_INIT_TIME,
+            TXN_BEGIN_TIME,
+            TXN_SEND_OFFSETS_TIME,
+            TXN_COMMIT_TIME,
+            TXN_ABORT_TIME
+        );
+
+        final double expectedTotalBlocked = BUFFER_POOL_WAIT_TIME + FLUSH_TME 
+ TXN_INIT_TIME +
+            TXN_BEGIN_TIME + TXN_SEND_OFFSETS_TIME +  TXN_COMMIT_TIME + 
TXN_ABORT_TIME;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), 
closeTo(expectedTotalBlocked, 0.01));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(
+            eosBetaMockProducer,
+            BUFFER_POOL_WAIT_TIME,
+            FLUSH_TME,
+            TXN_INIT_TIME,
+            TXN_BEGIN_TIME,
+            TXN_SEND_OFFSETS_TIME,
+            TXN_COMMIT_TIME,
+            TXN_ABORT_TIME
+        );
+        final double expectedTotalBlocked = BUFFER_POOL_WAIT_TIME + FLUSH_TME 
+ TXN_INIT_TIME +
+            TXN_BEGIN_TIME + TXN_SEND_OFFSETS_TIME +  TXN_COMMIT_TIME + 
TXN_ABORT_TIME;
+        assertThat(eosBetaStreamsProducer.totalBlockedTime(), 
equalTo(expectedTotalBlocked));
+        reset(mockTime);
+        final long closeStart = 1L;
+        final long clodeDelay = 1L;
+        
expect(mockTime.nanoseconds()).andReturn(closeStart).andReturn(closeStart + 
clodeDelay);
+        replay(mockTime);
+        eosBetaStreamsProducer.resetProducer();
+        setProducerMetrics(
+            eosBetaMockClientSupplier.producers.get(1),
+            BUFFER_POOL_WAIT_TIME,
+            FLUSH_TME,
+            TXN_INIT_TIME,
+            TXN_BEGIN_TIME,
+            TXN_SEND_OFFSETS_TIME,
+            TXN_COMMIT_TIME,
+            TXN_ABORT_TIME
+        );
+
+        assertThat(
+            eosBetaStreamsProducer.totalBlockedTime(),
+            closeTo(2 * expectedTotalBlocked + clodeDelay, 0.01)
+        );
+    }
+
+    private MetricName metricName(final String name) {
+        return new MetricName(name, "", "", Collections.emptyMap());
+    }
+
+    private void addMetric(
+        final MockProducer<?, ?> producer,
+        final String name,
+        final double value) {
+        final MetricName metricName = metricName(name);
+        producer.setMockMetrics(metricName, new Metric() {
+            @Override
+            public MetricName metricName() {
+                return metricName;
+            }
+
+            @Override
+            public Object metricValue() {
+                return value;
+            }
+        });
+    }
+
+    private void setProducerMetrics(
+        final MockProducer<?, ?> producer,
+        final double bufferPoolWaitTime,
+        final double flushTime,
+        final double txnInitTime,
+        final double txnBeginTime,
+        final double txnSendOffsetsTime,
+        final double txnCommitTime,
+        final double txnAbortTime) {
+        addMetric(producer, "bufferpool-wait-time-total", bufferPoolWaitTime);
+        addMetric(producer, "flush-time-ns-total", flushTime);
+        addMetric(producer, "txn-init-time-ns-total", txnInitTime);
+        addMetric(producer, "txn-begin-time-ns-total", txnBeginTime);
+        addMetric(producer, "txn-send-offsets-time-ns-total", 
txnSendOffsetsTime);
+        addMetric(producer, "txn-commit-time-ns-total", txnCommitTime);
+        addMetric(producer, "txn-abort-time-ns-total", txnAbortTime);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bfe05a6..24cf8c7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -57,6 +57,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
@@ -75,6 +76,7 @@ import static org.hamcrest.CoreMatchers.equalToObject;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -1209,4 +1211,92 @@ public class StreamsMetricsImplTest {
 
         verify(sensor);
     }
+
+    @Test
+    public void shouldAddThreadLevelMutableMetric() {
+        final int measuredValue = 123;
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> measuredValue
+        );
+
+        final MetricName name = metrics.metricName(
+            "foobar",
+            THREAD_LEVEL_GROUP,
+            Collections.singletonMap("thread-id", "t1")
+        );
+        assertThat(metrics.metric(name), notNullValue());
+        assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+    }
+
+    @Test
+    public void shouldCleanupThreadLevelMutableMetric() {
+        final int measuredValue = 123;
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> measuredValue
+        );
+
+        streamsMetrics.removeAllThreadLevelMetrics("t1");
+
+        final MetricName name = metrics.metricName(
+            "foobar",
+            THREAD_LEVEL_GROUP,
+            Collections.singletonMap("thread-id", "t1")
+        );
+        assertThat(metrics.metric(name), nullValue());
+    }
+
+    @Test
+    public void shouldAddThreadLevelImmutableMetric() {
+        final int measuredValue = 123;
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+        streamsMetrics.addThreadLevelImmutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            measuredValue
+        );
+
+        final MetricName name = metrics.metricName(
+            "foobar",
+            THREAD_LEVEL_GROUP,
+            Collections.singletonMap("thread-id", "t1")
+        );
+        assertThat(metrics.metric(name), notNullValue());
+        assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+    }
+
+    @Test
+    public void shouldCleanupThreadLevelImmutableMetric() {
+        final int measuredValue = 123;
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+        streamsMetrics.addThreadLevelImmutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            measuredValue
+        );
+
+        streamsMetrics.removeAllThreadLevelMetrics("t1");
+
+        final MetricName name = metrics.metricName(
+            "foobar",
+            THREAD_LEVEL_GROUP,
+            Collections.singletonMap("thread-id", "t1")
+        );
+        assertThat(metrics.metric(name), nullValue());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index ae0eae4..0a486db 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -16,14 +16,20 @@
  */
 package org.apache.kafka.streams.processor.internals.metrics;
 
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import 
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
 import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
 import java.util.Map;
+import org.mockito.ArgumentCaptor;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
@@ -387,4 +393,55 @@ public class ThreadMetricsTest {
 
         assertThat(sensor, is(expectedSensor));
     }
+
+    @Test
+    public void shouldAddThreadStartTimeMetric() {
+        // Given:
+        final long startTime = 123L;
+
+        // When:
+        ThreadMetrics.addThreadStartTimeMetric(
+            "bongo",
+            streamsMetrics,
+            startTime
+        );
+
+        // Then:
+        verify(streamsMetrics).addThreadLevelImmutableMetric(
+            "thread-start-time",
+            "The time that the thread was started",
+            "bongo",
+            startTime
+        );
+    }
+
+    @Test
+    public void shouldAddTotalBlockedTimeMetric() {
+        // Given:
+        final double startTime = 123.45;
+        final StreamThreadTotalBlockedTime blockedTime = 
mock(StreamThreadTotalBlockedTime.class);
+        when(blockedTime.compute()).thenReturn(startTime);
+
+        // When:
+        ThreadMetrics.addThreadBlockedTimeMetric(
+            "burger",
+            blockedTime,
+            streamsMetrics
+        );
+
+        // Then:
+        final ArgumentCaptor<Gauge<Double>> captor = gaugeCaptor();
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+            eq("blocked-time-ns-total"),
+            eq("The total time the thread spent blocked on kafka"),
+            eq("burger"),
+            captor.capture()
+        );
+        assertThat(captor.getValue().value(null, 678L), is(startTime));
+    }
+
+    @SuppressWarnings("unchecked")
+    private ArgumentCaptor<Gauge<Double>> gaugeCaptor() {
+        return ArgumentCaptor.forClass(Gauge.class);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 3f438f9..6a95ccb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -208,7 +209,8 @@ public class KeyValueStoreTestDriver<K, V> {
                 new MockClientSupplier(),
                 null,
                 null,
-                logContext),
+                logContext,
+                Time.SYSTEM),
             new DefaultProductionExceptionHandler(),
             new MockStreamsMetrics(new Metrics())
         ) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c8a320f..c56f7bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsConfig;
@@ -423,7 +424,8 @@ public class StreamThreadStateStoreProviderTest {
                 clientSupplier,
                 new TaskId(0, 0),
                 UUID.randomUUID(),
-                logContext
+                logContext,
+                Time.SYSTEM
             ),
             streamsConfig.defaultProductionExceptionHandler(),
             new MockStreamsMetrics(metrics));
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c73ab3e..05f10e9 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -362,7 +362,8 @@ public class TopologyTestDriver implements Closeable {
                     throw new IllegalStateException();
                 }
             },
-            logContext
+            logContext,
+            mockWallClockTime
         );
 
         setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics, 
cache);
@@ -1334,8 +1335,9 @@ public class TopologyTestDriver implements Closeable {
 
         public TestDriverProducer(final StreamsConfig config,
                                   final KafkaClientSupplier clientSupplier,
-                                  final LogContext logContext) {
-            super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, 
new TaskId(0, 0), UUID.randomUUID(), logContext);
+                                  final LogContext logContext,
+                                  final Time time) {
+            super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, 
new TaskId(0, 0), UUID.randomUUID(), logContext, time);
         }
 
         @Override

Reply via email to