This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 498f3f010fb MINOR: Support to update Async Consumer group member label (KIP-714) (#14946) 498f3f010fb is described below commit 498f3f010fb03466f21ebc61232002042167700a Author: Apoorv Mittal <amit...@confluent.io> AuthorDate: Fri Dec 8 00:23:05 2023 +0530 MINOR: Support to update Async Consumer group member label (KIP-714) (#14946) Part of KIP-714. As KIP-848 introduced a new Kafka Consumer, KIP-714 required changes to capture group_member_id. This change also includes a fix to update labels, as earlier calling updateLabels multiple times for same key would have added new resource attribute every time. Now it updates the map and then construct the labels, which updates the existing labels with new value. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Philip Nee <p...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../consumer/internals/AsyncKafkaConsumer.java | 6 ++-- .../consumer/internals/MembershipManagerImpl.java | 23 ++++++++++++++- .../consumer/internals/RequestManagers.java | 7 +++-- .../internals/ClientTelemetryProvider.java | 10 +++++-- .../consumer/internals/ConsumerTestBuilder.java | 3 +- .../internals/MembershipManagerImplTest.java | 8 ++--- .../internals/ClientTelemetryReporterTest.java | 34 ++++++++++++++++++++++ 7 files changed, 78 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 0e93b5f6e21..db483d45077 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -314,7 +314,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { groupRebalanceConfig, apiVersions, fetchMetricsManager, - networkClientDelegateSupplier); + networkClientDelegateSupplier, + clientTelemetryReporter); final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, applicationEventQueue, @@ -464,7 +465,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { groupRebalanceConfig, apiVersions, fetchMetricsManager, - networkClientDelegateSupplier + networkClientDelegateSupplier, + clientTelemetryReporter ); Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index c6393bbeefc..4633cb24bf4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -30,6 +30,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -237,13 +239,21 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource */ private boolean isRegisteredForMetadataUpdates; + /** + * Optional client telemetry reporter which sends client telemetry data to the broker. This + * will be empty if the client telemetry feature is not enabled. This is provided to update + * the group member id label when the member joins the group. + */ + private final Optional<ClientTelemetryReporter> clientTelemetryReporter; + public MembershipManagerImpl(String groupId, Optional<String> groupInstanceId, Optional<String> serverAssignor, SubscriptionState subscriptions, CommitRequestManager commitRequestManager, ConsumerMetadata metadata, - LogContext logContext) { + LogContext logContext, + Optional<ClientTelemetryReporter> clientTelemetryReporter) { this.groupId = groupId; this.state = MemberState.UNSUBSCRIBED; this.serverAssignor = serverAssignor; @@ -256,6 +266,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource this.assignmentReadyToReconcile = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); this.currentAssignment = new HashSet<>(); this.log = logContext.logger(MembershipManagerImpl.class); + this.clientTelemetryReporter = clientTelemetryReporter; } /** @@ -317,6 +328,16 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource ); throw new IllegalArgumentException(errorMessage); } + + // Update the group member id label in the client telemetry reporter if the member id has + // changed. Initially the member id is empty, and it is updated when the member joins the + // group. This is done here to avoid updating the label on every heartbeat response. Also + // check if the member id is null, as the schema defines it as nullable. + if (response.memberId() != null && !response.memberId().equals(memberId)) { + clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( + Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, response.memberId()))); + } + this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index c17e2714729..3dce3270873 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -119,7 +120,8 @@ public class RequestManagers implements Closeable { final GroupRebalanceConfig groupRebalanceConfig, final ApiVersions apiVersions, final FetchMetricsManager fetchMetricsManager, - final Supplier<NetworkClientDelegate> networkClientDelegateSupplier) { + final Supplier<NetworkClientDelegate> networkClientDelegateSupplier, + final Optional<ClientTelemetryReporter> clientTelemetryReporter) { return new CachedSupplier<RequestManagers>() { @Override protected RequestManagers create() { @@ -179,7 +181,8 @@ public class RequestManagers implements Closeable { subscriptions, commit, metadata, - logContext); + logContext, + clientTelemetryReporter); heartbeatRequestManager = new HeartbeatRequestManager( logContext, time, diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java index d5eb3fb0c07..2e7e195be31 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.MetricsContext; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class ClientTelemetryProvider implements Configurable { @@ -116,9 +117,12 @@ public class ClientTelemetryProvider implements Configurable { */ synchronized void updateLabels(Map<String, String> labels) { final Resource.Builder resourceBuilder = resource.toBuilder(); - labels.forEach((key, value) -> { - addAttribute(resourceBuilder, key, value); - }); + Map<String, String> finalLabels = resource.getAttributesList().stream().collect(Collectors.toMap( + KeyValue::getKey, kv -> kv.getValue().getStringValue())); + finalLabels.putAll(labels); + + resourceBuilder.clearAttributes(); + finalLabels.forEach((key, value) -> addAttribute(resourceBuilder, key, value)); resource = resourceBuilder.build(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 97d53dead85..28a68e9f751 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -202,7 +202,8 @@ public class ConsumerTestBuilder implements Closeable { subscriptions, commit, metadata, - logContext + logContext, + Optional.empty() ) ); HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 81265a529a5..15f15057aae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -86,7 +86,7 @@ public class MembershipManagerImplTest { private MembershipManagerImpl createMembershipManagerJoiningGroup() { MembershipManagerImpl manager = spy(new MembershipManagerImpl( GROUP_ID, Optional.empty(), Optional.empty(), subscriptionState, commitRequestManager, - metadata, testBuilder.logContext)); + metadata, testBuilder.logContext, Optional.empty())); manager.transitionToJoining(); return manager; } @@ -95,7 +95,7 @@ public class MembershipManagerImplTest { String serverAssignor) { MembershipManagerImpl manager = new MembershipManagerImpl( GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(serverAssignor), - subscriptionState, commitRequestManager, metadata, testBuilder.logContext); + subscriptionState, commitRequestManager, metadata, testBuilder.logContext, Optional.empty()); manager.transitionToJoining(); return manager; } @@ -120,7 +120,7 @@ public class MembershipManagerImplTest { // First join should register to get metadata updates MembershipManagerImpl manager = new MembershipManagerImpl( GROUP_ID, Optional.empty(), Optional.empty(), subscriptionState, commitRequestManager, - metadata, testBuilder.logContext); + metadata, testBuilder.logContext, Optional.empty()); manager.transitionToJoining(); verify(metadata).addClusterUpdateListener(manager); clearInvocations(metadata); @@ -198,7 +198,7 @@ public class MembershipManagerImplTest { public void testTransitionToFailedWhenTryingToJoin() { MembershipManagerImpl membershipManager = new MembershipManagerImpl( GROUP_ID, Optional.empty(), Optional.empty(), subscriptionState, commitRequestManager, metadata, - testBuilder.logContext); + testBuilder.logContext, Optional.empty()); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); membershipManager.transitionToJoining(); diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index 64d8a3af33f..2eed8d51195 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -169,6 +169,40 @@ public class ClientTelemetryReporterTest { .telemetrySender()).state()); } + @Test + public void testUpdateMetricsLabels() { + clientTelemetryReporter.configure(configs); + clientTelemetryReporter.contextChange(metricsContext); + assertTrue(clientTelemetryReporter.telemetryProvider().resource().getAttributesList().isEmpty()); + + clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key1", "value1")); + assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size()); + assertEquals("key1", clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0).getKey()); + assertEquals("value1", clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0).getValue().getStringValue()); + + clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "value2")); + assertEquals(2, clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size()); + clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute -> { + if (attribute.getKey().equals("key1")) { + assertEquals("value1", attribute.getValue().getStringValue()); + } else { + assertEquals("key2", attribute.getKey()); + assertEquals("value2", attribute.getValue().getStringValue()); + } + }); + + clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "valueUpdated")); + assertEquals(2, clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size()); + clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute -> { + if (attribute.getKey().equals("key1")) { + assertEquals("value1", attribute.getValue().getStringValue()); + } else { + assertEquals("key2", attribute.getKey()); + assertEquals("valueUpdated", attribute.getValue().getStringValue()); + } + }); + } + @Test public void testTelemetrySenderTimeToNextUpdate() { ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();