This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 498f3f010fb MINOR: Support to update Async Consumer group member label 
(KIP-714) (#14946)
498f3f010fb is described below

commit 498f3f010fb03466f21ebc61232002042167700a
Author: Apoorv Mittal <amit...@confluent.io>
AuthorDate: Fri Dec 8 00:23:05 2023 +0530

    MINOR: Support to update Async Consumer group member label (KIP-714) 
(#14946)
    
    Part of KIP-714.
    
    As KIP-848 introduced a new Kafka Consumer, KIP-714 required changes to 
capture group_member_id.
    This change also includes a fix to update labels, as earlier calling 
updateLabels multiple times for same key
    would have added new resource attribute every time. Now it updates the map 
and then construct the labels,
    which updates the existing labels with new value.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Philip Nee 
<p...@confluent.io>, Matthias J. Sax <matth...@confluent.io>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  6 ++--
 .../consumer/internals/MembershipManagerImpl.java  | 23 ++++++++++++++-
 .../consumer/internals/RequestManagers.java        |  7 +++--
 .../internals/ClientTelemetryProvider.java         | 10 +++++--
 .../consumer/internals/ConsumerTestBuilder.java    |  3 +-
 .../internals/MembershipManagerImplTest.java       |  8 ++---
 .../internals/ClientTelemetryReporterTest.java     | 34 ++++++++++++++++++++++
 7 files changed, 78 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 0e93b5f6e21..db483d45077 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -314,7 +314,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     groupRebalanceConfig,
                     apiVersions,
                     fetchMetricsManager,
-                    networkClientDelegateSupplier);
+                    networkClientDelegateSupplier,
+                    clientTelemetryReporter);
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
                     applicationEventQueue,
@@ -464,7 +465,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             groupRebalanceConfig,
             apiVersions,
             fetchMetricsManager,
-            networkClientDelegateSupplier
+            networkClientDelegateSupplier,
+            clientTelemetryReporter
         );
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
                 logContext,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index c6393bbeefc..4633cb24bf4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -237,13 +239,21 @@ public class MembershipManagerImpl implements 
MembershipManager, ClusterResource
      */
     private boolean isRegisteredForMetadataUpdates;
 
+    /**
+     * Optional client telemetry reporter which sends client telemetry data to 
the broker. This
+     * will be empty if the client telemetry feature is not enabled. This is 
provided to update
+     * the group member id label when the member joins the group.
+     */
+    private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
+
     public MembershipManagerImpl(String groupId,
                                  Optional<String> groupInstanceId,
                                  Optional<String> serverAssignor,
                                  SubscriptionState subscriptions,
                                  CommitRequestManager commitRequestManager,
                                  ConsumerMetadata metadata,
-                                 LogContext logContext) {
+                                 LogContext logContext,
+                                 Optional<ClientTelemetryReporter> 
clientTelemetryReporter) {
         this.groupId = groupId;
         this.state = MemberState.UNSUBSCRIBED;
         this.serverAssignor = serverAssignor;
@@ -256,6 +266,7 @@ public class MembershipManagerImpl implements 
MembershipManager, ClusterResource
         this.assignmentReadyToReconcile = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
         this.currentAssignment = new HashSet<>();
         this.log = logContext.logger(MembershipManagerImpl.class);
+        this.clientTelemetryReporter = clientTelemetryReporter;
     }
 
     /**
@@ -317,6 +328,16 @@ public class MembershipManagerImpl implements 
MembershipManager, ClusterResource
             );
             throw new IllegalArgumentException(errorMessage);
         }
+
+        // Update the group member id label in the client telemetry reporter 
if the member id has
+        // changed. Initially the member id is empty, and it is updated when 
the member joins the
+        // group. This is done here to avoid updating the label on every 
heartbeat response. Also
+        // check if the member id is null, as the schema defines it as 
nullable.
+        if (response.memberId() != null && 
!response.memberId().equals(memberId)) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(
+                
Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, 
response.memberId())));
+        }
+
         this.memberId = response.memberId();
         this.memberEpoch = response.memberEpoch();
         ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index c17e2714729..3dce3270873 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -119,7 +120,8 @@ public class RequestManagers implements Closeable {
                                                      final 
GroupRebalanceConfig groupRebalanceConfig,
                                                      final ApiVersions 
apiVersions,
                                                      final FetchMetricsManager 
fetchMetricsManager,
-                                                     final 
Supplier<NetworkClientDelegate> networkClientDelegateSupplier) {
+                                                     final 
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
+                                                     final 
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
         return new CachedSupplier<RequestManagers>() {
             @Override
             protected RequestManagers create() {
@@ -179,7 +181,8 @@ public class RequestManagers implements Closeable {
                             subscriptions,
                             commit,
                             metadata,
-                            logContext);
+                            logContext,
+                            clientTelemetryReporter);
                     heartbeatRequestManager = new HeartbeatRequestManager(
                             logContext,
                             time,
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java
index d5eb3fb0c07..2e7e195be31 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.MetricsContext;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class ClientTelemetryProvider implements Configurable {
 
@@ -116,9 +117,12 @@ public class ClientTelemetryProvider implements 
Configurable {
      */
     synchronized void updateLabels(Map<String, String> labels) {
         final Resource.Builder resourceBuilder = resource.toBuilder();
-        labels.forEach((key, value) -> {
-            addAttribute(resourceBuilder, key, value);
-        });
+        Map<String, String> finalLabels = 
resource.getAttributesList().stream().collect(Collectors.toMap(
+            KeyValue::getKey, kv -> kv.getValue().getStringValue()));
+        finalLabels.putAll(labels);
+
+        resourceBuilder.clearAttributes();
+        finalLabels.forEach((key, value) -> addAttribute(resourceBuilder, key, 
value));
         resource = resourceBuilder.build();
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 97d53dead85..28a68e9f751 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -202,7 +202,8 @@ public class ConsumerTestBuilder implements Closeable {
                         subscriptions,
                         commit,
                         metadata,
-                        logContext
+                        logContext,
+                        Optional.empty()
                 )
             );
             HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 81265a529a5..15f15057aae 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -86,7 +86,7 @@ public class MembershipManagerImplTest {
     private MembershipManagerImpl createMembershipManagerJoiningGroup() {
         MembershipManagerImpl manager = spy(new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), Optional.empty(), 
subscriptionState, commitRequestManager,
-                metadata, testBuilder.logContext));
+                metadata, testBuilder.logContext, Optional.empty()));
         manager.transitionToJoining();
         return manager;
     }
@@ -95,7 +95,7 @@ public class MembershipManagerImplTest {
                                                                       String 
serverAssignor) {
         MembershipManagerImpl manager = new MembershipManagerImpl(
                 GROUP_ID, Optional.ofNullable(groupInstanceId), 
Optional.ofNullable(serverAssignor),
-                subscriptionState, commitRequestManager, metadata, 
testBuilder.logContext);
+                subscriptionState, commitRequestManager, metadata, 
testBuilder.logContext, Optional.empty());
         manager.transitionToJoining();
         return manager;
     }
@@ -120,7 +120,7 @@ public class MembershipManagerImplTest {
         // First join should register to get metadata updates
         MembershipManagerImpl manager = new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), Optional.empty(), 
subscriptionState, commitRequestManager,
-                metadata, testBuilder.logContext);
+                metadata, testBuilder.logContext, Optional.empty());
         manager.transitionToJoining();
         verify(metadata).addClusterUpdateListener(manager);
         clearInvocations(metadata);
@@ -198,7 +198,7 @@ public class MembershipManagerImplTest {
     public void testTransitionToFailedWhenTryingToJoin() {
         MembershipManagerImpl membershipManager = new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), Optional.empty(), 
subscriptionState, commitRequestManager, metadata,
-                testBuilder.logContext);
+                testBuilder.logContext, Optional.empty());
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
         membershipManager.transitionToJoining();
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index 64d8a3af33f..2eed8d51195 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -169,6 +169,40 @@ public class ClientTelemetryReporterTest {
             .telemetrySender()).state());
     }
 
+    @Test
+    public void testUpdateMetricsLabels() {
+        clientTelemetryReporter.configure(configs);
+        clientTelemetryReporter.contextChange(metricsContext);
+        
assertTrue(clientTelemetryReporter.telemetryProvider().resource().getAttributesList().isEmpty());
+
+        
clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key1", 
"value1"));
+        assertEquals(1, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
+        assertEquals("key1", 
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0).getKey());
+        assertEquals("value1", 
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0).getValue().getStringValue());
+
+        
clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", 
"value2"));
+        assertEquals(2, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
+        
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute
 -> {
+            if (attribute.getKey().equals("key1")) {
+                assertEquals("value1", attribute.getValue().getStringValue());
+            } else {
+                assertEquals("key2", attribute.getKey());
+                assertEquals("value2", attribute.getValue().getStringValue());
+            }
+        });
+
+        
clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", 
"valueUpdated"));
+        assertEquals(2, 
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
+        
clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute
 -> {
+            if (attribute.getKey().equals("key1")) {
+                assertEquals("value1", attribute.getValue().getStringValue());
+            } else {
+                assertEquals("key2", attribute.getKey());
+                assertEquals("valueUpdated", 
attribute.getValue().getStringValue());
+            }
+        });
+    }
+
     @Test
     public void testTelemetrySenderTimeToNextUpdate() {
         ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();

Reply via email to