lucasbru commented on code in PR #22552:
URL: https://github.com/apache/kafka/pull/22552#discussion_r3413010015
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -643,12 +674,47 @@ public CompletableFuture<StreamsGroupHeartbeatResult>
streamsGroupHeartbeat(
.setErrorCode(error.code())
.setErrorMessage(message),
Map.of(),
+ -1,
+ -1,
-1
),
log
));
}
+ /**
+ * See {@link
GroupCoordinator#streamsGroupTopologyDescriptionUpdate(AuthorizableRequestContext,
StreamsGroupTopologyDescriptionUpdateRequestData)}.
+ *
+ * <p>The push pipeline lives on {@link TopologyDescriptionManager}; the
service is
+ * responsible only for short-circuiting on a non-active coordinator and
translating
+ * unhandled exceptions into the wire error response.
+ */
+ @Override
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
streamsGroupTopologyDescriptionUpdate(
Review Comment:
The KIP lists GROUP_AUTHORIZATION_FAILED among this RPC's error codes, but I
don't see a KafkaApis handler for StreamsGroupTopologyDescriptionUpdate in this
PR — only the coordinator-side wiring. Is the request routing + authorization
landing in 3/3? Just want to confirm the RPC is reachable end to end somewhere.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTopologyDescriptionTest.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
+import org.apache.kafka.server.share.persister.NoOpStatePersister;
+import org.apache.kafka.server.util.timer.MockTimer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import static
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
+import static
org.apache.kafka.coordinator.group.GroupConfigManagerTest.createConfigManager;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the topology-description plugin paths added to {@link
GroupCoordinatorService}:
+ * the new {@code streamsGroupTopologyDescriptionUpdate} RPC, the heartbeat
post-processing
+ * that sets {@code TopologyDescriptionRequired}, and the back-off interaction.
+ */
+public class GroupCoordinatorServiceTopologyDescriptionTest {
+
+ private static final TopicPartition GROUP_TP = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0);
+
+ @SuppressWarnings("unchecked")
+ private static CoordinatorRuntime<GroupCoordinatorShard,
CoordinatorRecord> mockRuntime() {
+ return (CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>)
mock(CoordinatorRuntime.class);
+ }
+
+ private static GroupCoordinatorService buildService(
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ boolean startup
+ ) {
+ MockTimer timer = new MockTimer();
+ MockTime time = timer.time();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096,
600000L, 24),
+ runtime,
+ new GroupCoordinatorMetrics(),
+ createConfigManager(),
+ new NoOpStatePersister(),
+ timer,
+ null,
+ plugin,
+ time
+ );
+ if (startup) {
+ service.startup(() -> 1);
+ }
+ return service;
+ }
+
+ @Test
+ public void testUpdateRejectedWhenCoordinatorNotActive() throws Exception {
+ GroupCoordinatorService service = buildService(mockRuntime(),
Optional.empty(), false);
+
+ StreamsGroupTopologyDescriptionUpdateResponseData response =
service.streamsGroupTopologyDescriptionUpdate(
+ requestContext(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE),
+ validUpdateRequest()
+ ).get(5, TimeUnit.SECONDS);
+
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(),
response.errorCode());
+ }
+
+ @Test
+ public void testUpdateReturnsUnsupportedVersionWhenNoPlugin() throws
Exception {
+ GroupCoordinatorService service = buildService(mockRuntime(),
Optional.empty(), true);
+
+ StreamsGroupTopologyDescriptionUpdateResponseData response =
service.streamsGroupTopologyDescriptionUpdate(
+ requestContext(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE),
+ validUpdateRequest()
+ ).get(5, TimeUnit.SECONDS);
+
+ assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.errorCode());
+ assertNotNull(response.errorMessage());
+ }
+
+ @Test
+ public void testUpdateRejectsEmptyMemberId() throws Exception {
+ StreamsGroupTopologyDescriptionPlugin plugin =
mock(StreamsGroupTopologyDescriptionPlugin.class);
+ GroupCoordinatorService service = buildService(mockRuntime(),
Optional.of(plugin), true);
+
+ StreamsGroupTopologyDescriptionUpdateResponseData response =
service.streamsGroupTopologyDescriptionUpdate(
+ requestContext(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE),
+ validUpdateRequest().setMemberId("")
+ ).get(5, TimeUnit.SECONDS);
+
+ assertEquals(Errors.INVALID_REQUEST.code(), response.errorCode());
+ assertEquals("MemberId can't be empty.", response.errorMessage());
+ }
+
+ @Test
+ public void testUpdateRejectsEmptyGroupId() throws Exception {
+ StreamsGroupTopologyDescriptionPlugin plugin =
mock(StreamsGroupTopologyDescriptionPlugin.class);
+ GroupCoordinatorService service = buildService(mockRuntime(),
Optional.of(plugin), true);
+
+ StreamsGroupTopologyDescriptionUpdateResponseData response =
service.streamsGroupTopologyDescriptionUpdate(
+ requestContext(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE),
+ validUpdateRequest().setGroupId("")
+ ).get(5, TimeUnit.SECONDS);
+
+ assertEquals(Errors.INVALID_REQUEST.code(), response.errorCode());
+ }
+
+ @Test
+ public void testUpdateSuccessPersistsStoredEpoch() throws Exception {
Review Comment:
The KIP calls out that the broker may re-issue an identical setTopology when
an earlier call's bookkeeping write failed — which is the reason BackoffAction
defaults to ARM. Could we add a test for the plugin-success-but-write-fails
case (the stored-epoch scheduleWriteOperation returns a failed future)
asserting the back-off ends up armed rather than cleared? None of the update
tests currently assert back-off state (clear on success / arm on transient), so
that invariant isn't covered.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is
rebuilt from
+ * scratch on broker restart, and the persisted {@code
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor})
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+ private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+ private final StreamsGroupTopologyDescriptionBackoff backoff;
+ private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>
runtime;
+ private final Function<String, TopicPartition> topicPartitionFor;
+
+ public TopologyDescriptionManager(
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Function<String, TopicPartition> topicPartitionFor,
+ Time time
+ ) {
+ this.plugin = plugin;
+ this.runtime = runtime;
+ this.topicPartitionFor = topicPartitionFor;
+ this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+ }
+
+ /**
+ * @return true if a topology description plugin is configured on this
broker.
+ */
+ public boolean isPluginConfigured() {
+ return plugin.isPresent();
+ }
+
+ /**
+ * Post-processes a successful streams group heartbeat result by deciding
whether the
+ * broker should set {@code TopologyDescriptionRequired=true} on the
response, and
+ * arming the per-group back-off when it does.
+ *
+ * <p>The flag is set when the topology description plugin is configured,
the group
+ * has resolved to a topology epoch, that epoch is neither stored nor
permanently
+ * failed at the plugin, no back-off is in effect for this epoch, and the
response
+ * does not carry a {@code STALE_TOPOLOGY} status (the member would just
be told to
+ * catch up first). When the response already carries an error code we
leave it
+ * alone.
+ */
+ public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+ StreamsGroupHeartbeatResult result,
+ String groupId
+ ) {
+ if (plugin.isEmpty()) {
+ return result;
+ }
+ StreamsGroupHeartbeatResponseData response = result.data();
+ if (response.errorCode() != Errors.NONE.code()) {
+ return result;
+ }
+ int currentEpoch = result.currentTopologyEpoch();
+ if (currentEpoch < 0
+ || result.storedDescriptionTopologyEpoch() == currentEpoch
+ || result.failedDescriptionTopologyEpoch() == currentEpoch
+ || responseHasStaleTopology(response)) {
+ return result;
+ }
+ // Atomic check-and-arm: only set the flag if the back-off window is
not already
+ // in effect for this epoch, so two concurrent heartbeats for the same
group cannot
+ // both arm the back-off and double the window beyond its intended
length.
+ if (backoff.armIfNotActive(groupId, currentEpoch)) {
+ response.setTopologyDescriptionRequired(true);
+ }
+ return result;
+ }
+
+ /**
+ * Reject the request synchronously when no plugin is configured or the
request fails
+ * basic structural validation. Returns the response to send back to the
client, or
+ * empty when the request is accepted for further processing. The caller
is expected
+ * to have already short-circuited on a non-active coordinator.
+ */
+ public Optional<StreamsGroupTopologyDescriptionUpdateResponseData>
preCheckTopologyDescriptionUpdate(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ if (plugin.isEmpty()) {
+ return Optional.of(errorResponse(
+ Errors.UNSUPPORTED_VERSION,
+ "The broker has no streams group topology description plugin
configured."
+ ));
+ }
+ if (request.memberId() == null || request.memberId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId
can't be empty."));
+ }
+ if (request.groupId() == null || request.groupId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId
can't be empty."));
+ }
+ if (request.topologyDescription() == null) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST,
"TopologyDescription can't be null."));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Drive the push chain: validate the (group, member), convert the wire
payload, call
+ * the plugin, persist the outcome, and centralize back-off state
mutations in a
+ * single {@code whenComplete}.
+ *
+ * <p>The chain carries the terminal disposition through an {@link
AtomicReference}
+ * holder so {@code whenComplete} can act on it without having to reason
about the
+ * response shape. Default is {@link BackoffAction#ARM}; any post-plugin
failure
+ * (including a metadata-record write that fails after a successful plugin
call)
+ * therefore re-arms the back-off and the next heartbeat re-solicits an
idempotent
+ * re-push.
+ */
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
handleSetTopology(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ final String groupId = request.groupId();
+ final String memberId = request.memberId();
+ final int pushedEpoch = request.topologyEpoch();
+ final TopicPartition tp = topicPartitionFor.apply(groupId);
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+ final AtomicReference<BackoffAction> backoffAction = new
AtomicReference<>(BackoffAction.ARM);
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-description-validate",
+ tp,
+ (coordinator, lastCommittedOffset) -> {
+ coordinator.validateStreamsGroupMember(groupId, memberId,
lastCommittedOffset);
+ return null;
+ })
+ .thenApply(__ ->
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+ .thenCompose(description -> invokePluginSetTopology(p, groupId,
pushedEpoch, description))
+ .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+ case SUCCESS -> runtime.scheduleWriteOperation(
+ "streams-group-set-stored-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return new
StreamsGroupTopologyDescriptionUpdateResponseData();
+ });
+ case PERMANENT -> runtime.scheduleWriteOperation(
+ "streams-group-set-failed-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message());
+ });
+ case TRANSIENT ->
CompletableFuture.completedFuture(errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message()));
+ })
+ .whenComplete((response, throwable) -> {
+ if (backoffAction.get() == BackoffAction.CLEAR) {
+ backoff.clear(groupId);
+ } else {
+ backoff.armOrExtend(groupId, pushedEpoch);
+ }
+ });
+ }
+
+ // Visible for testing.
+ StreamsGroupTopologyDescriptionBackoff backoff() {
+ return backoff;
+ }
+
+ private static boolean
responseHasStaleTopology(StreamsGroupHeartbeatResponseData response) {
+ if (response.status() == null) {
+ return false;
+ }
+ byte staleCode = Status.STALE_TOPOLOGY.code();
+ return response.status().stream().anyMatch(s -> s.statusCode() ==
staleCode);
+ }
+
+ private static StreamsGroupTopologyDescriptionUpdateResponseData
errorResponse(
+ Errors error,
+ String message
+ ) {
+ return new StreamsGroupTopologyDescriptionUpdateResponseData()
+ .setErrorCode(error.code())
+ .setErrorMessage(message);
+ }
+
+ /**
+ * Calls the plugin's {@code setTopology} and folds the result into a
{@link PluginOutcome}.
+ * The future never completes exceptionally — the outcome carries the
failure category.
+ */
+ private static CompletableFuture<PluginOutcome> invokePluginSetTopology(
+ StreamsGroupTopologyDescriptionPlugin plugin,
+ String groupId,
+ int pushedEpoch,
+ StreamsGroupTopologyDescription description
+ ) {
+ final CompletableFuture<Void> pluginFuture;
+ try {
+ pluginFuture = plugin.setTopology(groupId, pushedEpoch,
description);
+ } catch (Throwable t) {
+ // A synchronous throw from the plugin is treated as a permanent
failure with a
+ // generic client-visible message.
+ return
CompletableFuture.completedFuture(PluginOutcome.permanent(t.getMessage()));
Review Comment:
Treating a synchronous throw as permanent is what the KIP asks for, but it
also says the message should be generic — "a synchronous throw is treated as a
permanent failure with a generic client-visible error message". Here we pass
t.getMessage() straight through, which leaks the plugin's exception text to the
client; probably want a fixed generic string instead. Separately, catch
(Throwable t) also swallows Errors (OOM etc.) and turns them into a
permanent-failure response — catch Exception reads safer.
##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
public record Source(String name, Set<String> topics, Set<String>
successors) implements Node {
public Source {
Objects.requireNonNull(name, "name");
- topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ topics =
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
Review Comment:
I think if we do not modify the set outside, not making explicit copies here
would be permissible (for performance reasons as well)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]