Copilot commented on code in PR #22554:
URL: https://github.com/apache/kafka/pull/22554#discussion_r3413157693


##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
     public record Source(String name, Set<String> topics, Set<String> 
successors) implements Node {
         public Source {
             Objects.requireNonNull(name, "name");
-            topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            topics = 
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }

Review Comment:
   The node constructors wrap the provided sets with 
`Collections.unmodifiableSet(...)` but do not defensively copy. If the caller 
keeps a reference to `topics`/`successors` and mutates it later, this record’s 
equality/hashCode and iteration order can change after construction (breaking 
immutability expectations for a public API type). Consider copying into a new 
`LinkedHashSet` (preserves insertion order when the input is one) before 
wrapping unmodifiable.



##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
     public record Source(String name, Set<String> topics, Set<String> 
successors) implements Node {
         public Source {
             Objects.requireNonNull(name, "name");
-            topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            topics = 
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }
     }
 
     public record Processor(String name, Set<String> stores, Set<String> 
successors) implements Node {
         public Processor {
             Objects.requireNonNull(name, "name");
-            stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            stores = 
Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }

Review Comment:
   Same immutability issue as `Source`: `stores`/`successors` are wrapped but 
not copied, so external mutation after construction can change this record’s 
state and hashCode. Defensively copy before wrapping unmodifiable to keep API 
objects truly immutable.



##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
     public record Source(String name, Set<String> topics, Set<String> 
successors) implements Node {
         public Source {
             Objects.requireNonNull(name, "name");
-            topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            topics = 
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }
     }
 
     public record Processor(String name, Set<String> stores, Set<String> 
successors) implements Node {
         public Processor {
             Objects.requireNonNull(name, "name");
-            stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            stores = 
Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }
     }
 
     public record Sink(String name, Optional<String> topic, Set<String> 
successors) implements Node {
         public Sink {
             Objects.requireNonNull(name, "name");
             Objects.requireNonNull(topic, "topic");
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }

Review Comment:
   `Sink.successors` is wrapped but not defensively copied, so a caller 
retaining/mutating the passed-in set can change the record’s state 
post-construction. Defensively copy (e.g., into a `LinkedHashSet`) before 
wrapping unmodifiable.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });
+    }
+
+    /**
+     * Call {@code plugin.deleteTopology} for every supplied group id that is 
a streams
+     * group carrying a stored topology description. Returns a per-group map 
of failures
+     * keyed by group id; groups absent from the map either had no plugin 
state or the
+     * plugin call succeeded. Empty (no-op) when no plugin is configured. Used 
by the
+     * service-level {@code DeleteGroups} flow to gate tombstoning on plugin 
success.
+     */
+    public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+        TopicPartition topicPartition,
+        List<String> groupIds
+    ) {
+        if (plugin.isEmpty()) {
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-pre-delete",
+                topicPartition,
+                (coordinator, lastCommittedOffset) ->
+                    
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds, 
lastCommittedOffset))
+            .thenCompose(groupsWithStored -> {
+                if (groupsWithStored.isEmpty()) {
+                    return CompletableFuture.completedFuture(Map.<String, 
ApiError>of());
+                }
+                Map<String, CompletableFuture<Throwable>> calls = new 
HashMap<>(groupsWithStored.size());
+                for (String groupId : groupsWithStored) {
+                    CompletableFuture<Throwable> outcome;
+                    try {
+                        outcome = p.deleteTopology(groupId).handle((unused, 
throwable) -> throwable);
+                    } catch (Throwable t) {
+                        outcome = CompletableFuture.completedFuture(t);
+                    }
+                    calls.put(groupId, outcome);
+                }
+                CompletableFuture<?>[] all = calls.values().toArray(new 
CompletableFuture<?>[0]);
+                return CompletableFuture.allOf(all).thenApply(unused -> {
+                    Map<String, ApiError> failures = new HashMap<>();
+                    calls.forEach((groupId, throwableFuture) -> {
+                        Throwable t = throwableFuture.join();
+                        if (t != null) {
+                            Throwable cause = t instanceof CompletionException 
&& t.getCause() != null
+                                ? t.getCause() : t;
+                            log.warn("Topology description plugin failed to 
delete topology for streams group {}.",
+                                groupId, cause);
+                            backoff.clear(groupId);
+                            failures.put(groupId, new 
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+                        } else {
+                            backoff.clear(groupId);
+                        }
+                    });
+                    return failures;
+                });
+            })
+            .exceptionally(exception -> {
+                // A failure to read the streams group state (NOT_COORDINATOR 
etc.) is treated
+                // the same as a plugin failure: every retained group is held 
back from
+                // tombstoning so the caller can retry. The downstream chain 
prefers the more
+                // specific error from the runtime over GROUP_DELETION_FAILED.
+                Errors error = Errors.forException(exception);
+                Map<String, ApiError> failures = new HashMap<>();
+                for (String groupId : groupIds) {
+                    failures.put(groupId, new ApiError(error, 
exception.getMessage()));
+                }
+                return failures;
+            });
+    }
+
+    // Visible for testing.
+    StreamsGroupTopologyDescriptionBackoff backoff() {
+        return backoff;
+    }
+
+    private static boolean 
responseHasStaleTopology(StreamsGroupHeartbeatResponseData response) {
+        if (response.status() == null) {
+            return false;
+        }
+        byte staleCode = Status.STALE_TOPOLOGY.code();
+        return response.status().stream().anyMatch(s -> s.statusCode() == 
staleCode);
+    }
+
+    private static StreamsGroupTopologyDescriptionUpdateResponseData 
errorResponse(
+        Errors error,
+        String message
+    ) {
+        return new StreamsGroupTopologyDescriptionUpdateResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(message);
+    }
+
+    /**
+     * Calls the plugin's {@code setTopology} and folds the result into a 
{@link PluginOutcome}.
+     * The future never completes exceptionally — the outcome carries the 
failure category.
+     */
+    private static CompletableFuture<PluginOutcome> invokePluginSetTopology(
+        StreamsGroupTopologyDescriptionPlugin plugin,
+        String groupId,
+        int pushedEpoch,
+        StreamsGroupTopologyDescription description
+    ) {
+        final CompletableFuture<Void> pluginFuture;
+        try {
+            pluginFuture = plugin.setTopology(groupId, pushedEpoch, 
description);
+        } catch (Throwable t) {
+            // A synchronous throw from the plugin is treated as a permanent 
failure with a
+            // generic client-visible message.
+            return 
CompletableFuture.completedFuture(PluginOutcome.permanent(t.getMessage()));
+        }

Review Comment:
   The comment says a synchronous throw from the plugin yields a "generic 
client-visible message", but the implementation forwards `t.getMessage()` 
directly (which can be null and may leak unintended details). Either adjust the 
comment or ensure the message is non-null and intentionally chosen.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });

Review Comment:
   `whenComplete` currently arms/extends the backoff on *any* failure (because 
`backoffAction` defaults to ARM), including coordinator-routing failures like 
`NotCoordinatorException` from `scheduleReadOperation`. Since the backoff map 
is broker-wide and keyed only by `groupId`, a misrouted request can leave stale 
backoff state that suppresses `TopologyDescriptionRequired` later if this 
broker becomes coordinator for the group’s partition. Consider skipping backoff 
mutations for coordinator-routing errors (NOT_COORDINATOR / 
COORDINATOR_LOAD_IN_PROGRESS / COORDINATOR_NOT_AVAILABLE).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to